I want to disable the Kafka internal log which I didn't write. Below code is my basic kafka producer code.
package me.sclee.kafka.basic.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.UUID;
import static me.sclee.kafka.basic.config.BasicKafkaConfig.*;
public class BasicKafkaProducer {
private static final Logger logger = LoggerFactory.getLogger(BasicKafkaProducer.class);
private static KafkaProducer<String, String> getKafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.ACKS_CONFIG, ACK);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, COMPRESSION_TYPE);
props.put(ProducerConfig.RETRIES_CONFIG, RETIRES);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE);
props.put(ProducerConfig.LINGER_MS_CONFIG, LINGER_MS);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, BUFFER_MEMORY);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SER);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SER);
return new KafkaProducer<>(props);
}
public void send() throws Exception {
logger.info("Sending kafka message..");
KafkaProducer<String, String> producer = null;
int line = 3;
try {
for (int n = 0; n < line; n++) {
producer = getKafkaProducer();
String uuid = UUID.randomUUID().toString();
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, uuid);
producer.send(record, new ProducerCallBack(n));
Thread.sleep(1000);
}
} catch (Exception e) {
logger.error("There was a problem while sending a message in producer, {}" + e.getMessage());
throw e;
} finally {
if (producer != null) {
producer.flush();
producer.close();
}
}
logger.info("Exited the Kafka sending..");
}
public static void main(String[] args) throws Exception {
BasicKafkaProducer basicKafkaProducer = new BasicKafkaProducer();
basicKafkaProducer.send();
}
}
As you can see, I put some logs for my trace but when I executed I saw many other logs which is generated by the internal log as below.
I want to see my log in the code for the better understanding. How to achieve it?
I used the following slf4j libs.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.0-alpha1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.0-alpha1</version>
</dependency>
console
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = snappy
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 5
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 1
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
[Timer-0] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.0.0
[Timer-0] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
[Timer-0] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1642705323290
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: oXvU2NlRS8m6jKC3Zh1FcA
[kafka-producer-network-thread | producer-1] INFO me.sclee.kafka.basic.producer.ProducerCallBack - Producer sends a message. Topic : 1642705323550, partition : 0, offset : 139, line : 0
[Timer-0] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
acks = -1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-2
compression.type = snappy
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 5
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 1
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
[Timer-0] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.0.0
[Timer-0] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
[Timer-0] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1642705324831
[kafka-producer-network-thread | producer-2] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-2] Cluster ID: oXvU2NlRS8m6jKC3Zh1FcA
[kafka-producer-network-thread | producer-2] INFO me.sclee.kafka.basic.producer.ProducerCallBack - Producer sends a message. Topic : 1642705324834, partition : 0, offset : 140, line : 1
[Timer-0] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
acks = -1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-3
compression.type = snappy
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 5
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 1
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
[Timer-0] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.0.0
[Timer-0] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
[Timer-0] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1642705325844
[kafka-producer-network-thread | producer-3] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-3] Cluster ID: oXvU2NlRS8m6jKC3Zh1FcA
[kafka-producer-network-thread | producer-3] INFO me.sclee.kafka.basic.producer.ProducerCallBack - Producer sends a message. Topic : 1642705325848, partition : 0, offset : 141, line : 2
You can create your own src/main/resources/log4j.properties
file and configure whatever levels/packages/formats you wish.
For example,
log4j.logger.kafka=OFF
log4j.logger.org.apache.kafka=OFF
Defaults (for the broker and clients) are defined here https://github.com/apache/kafka/blob/trunk/config/log4j.properties