I'm new to Scala and Kafka and I've run into some trouble.
I'm trying to connect a scala kafka producer to a kafka server that is installed on a cloudera express server. I have done this already once in VMs with these instructions and didn't have any problems.
When I run the producer the desired topic is created but none of the messages is sent, or so I think.
Here follows some of the code:
Kafka producer
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
class KafkaProducerManager {
val props = new Properties()
props.put("bootstrap.servers", KafkaServer.KAFKA_ADDRESS)
props.put("acks", "all")
props.put("retries", "2")
props.put("auto.commit.interval.ms", "1000")
props.put("linger.ms", "1")
props.put("block.on.buffer.full", "true")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("auto.create.topics.enable", "true")
val producer = new KafkaProducer[String, String](props)
def startCounter() {
println("Start Producer Counter")
for (i <- 1 to 100) {
producer.send(new ProducerRecord("test-counter", i.toString, "Package " + i))
println("Producer - Send: " + i)
}
println("Closing producer")
producer.close()
}
}
When I execute the run method, I see "Producer - Send: #" as output of this and I get no errors. So I assume that this piece of code has sent the messages to Kafka.
I started the following on the kafka server before I ran the producer:
kafka-console-consumer --zookeeper zk:2181 --topic test-counter
But here I see nothing happens.
When I check for the topic, that the producer is supposed to create, is in the list.
kafka-topics -zookeeper zk:2181 --list
I also have a similar problem with the consumer:
import java.util.{Arrays, Properties}
import org.apache.kafka.clients.consumer.KafkaConsumer
class KafkaConsumerManager {
val props = new Properties()
props.put("bootstrap.servers", KafkaServer.KAFKA_ADDRESS)
props.put("group.id", "testGroup")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
props.put("linger.ms", "1")
props.put("session.timeout.ms", "3000")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("zookeeper.connect", KafkaServer.ZOOKEEPER_ADDRESS)
val consumer = new KafkaConsumer[String, String](props)
def start() {
println("Start Consumer")
consumer.subscribe(Arrays.asList("test-counter"))
while (true) {
val records = consumer.poll(100)
val iterator = records.iterator()
while (iterator.hasNext) {
val record = iterator.next()
printf("Consumer: offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value())
}
}
}
}
When I create messages on the server via kafka-console-producer I see them appear in the kafka-console-consumer on the server, but not in the consumer I wrote.
kafka-console-producer --broker-list ks:9092 --topic test-counter
The KafkaServer.ZOOKEEPER_ADDRESS is the same as the argument zk:2181 with kafka-console-consumer and the KafkaServer.KAFKA_ADDRESS is the same as the argument ks:9092 with the kafka-console-producer.
I tried the code and found that:
one should specify key and value deserializers in consumer properties:
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
there is a problem with session.timeout.ms
property. From here:
heartbeat.interval.ms - ... The value must be set lower than session.timeout.ms ... default: 3000
It means that you should either increase your session.timeout.ms
value or simply remove the line because default value for the
property is 30000
which is greater than default
heartbeat.interval.ms
.
After performing the corrections the code works.