scalacloudera-cdhkafka-consumer-apikafka-producer-api

Kafka producer creates topic but is not able to send messages


I'm new to Scala and Kafka and I've run into some trouble.

I'm trying to connect a scala kafka producer to a kafka server that is installed on a cloudera express server. I have done this already once in VMs with these instructions and didn't have any problems.

When I run the producer the desired topic is created but none of the messages is sent, or so I think.

Here follows some of the code:

Kafka producer

import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

class KafkaProducerManager {

    val props = new Properties()
    props.put("bootstrap.servers", KafkaServer.KAFKA_ADDRESS)
    props.put("acks", "all")
    props.put("retries", "2")
    props.put("auto.commit.interval.ms", "1000")
    props.put("linger.ms", "1")
    props.put("block.on.buffer.full", "true")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("auto.create.topics.enable", "true")

    val producer = new KafkaProducer[String, String](props)

   def startCounter() {
       println("Start Producer Counter")
       for (i <- 1 to 100) {
           producer.send(new ProducerRecord("test-counter", i.toString, "Package " + i))
           println("Producer - Send: " + i)
       }

       println("Closing producer")
       producer.close()
   }
}

When I execute the run method, I see "Producer - Send: #" as output of this and I get no errors. So I assume that this piece of code has sent the messages to Kafka.

I started the following on the kafka server before I ran the producer:

 kafka-console-consumer --zookeeper zk:2181 --topic test-counter

But here I see nothing happens.

When I check for the topic, that the producer is supposed to create, is in the list.

kafka-topics -zookeeper zk:2181 --list

I also have a similar problem with the consumer:

import java.util.{Arrays, Properties}
import org.apache.kafka.clients.consumer.KafkaConsumer

class KafkaConsumerManager {

    val props = new Properties()
    props.put("bootstrap.servers", KafkaServer.KAFKA_ADDRESS)
    props.put("group.id", "testGroup")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "1000")
    props.put("linger.ms", "1")
    props.put("session.timeout.ms", "3000")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("zookeeper.connect", KafkaServer.ZOOKEEPER_ADDRESS)

    val consumer = new KafkaConsumer[String, String](props)

    def start() {
        println("Start Consumer")
        consumer.subscribe(Arrays.asList("test-counter"))

        while (true) {
            val records = consumer.poll(100)
            val iterator = records.iterator()

            while (iterator.hasNext) {
                val record = iterator.next()
                printf("Consumer: offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value())
            }
        }
    }
}

When I create messages on the server via kafka-console-producer I see them appear in the kafka-console-consumer on the server, but not in the consumer I wrote.

kafka-console-producer --broker-list ks:9092 --topic test-counter

The KafkaServer.ZOOKEEPER_ADDRESS is the same as the argument zk:2181 with kafka-console-consumer and the KafkaServer.KAFKA_ADDRESS is the same as the argument ks:9092 with the kafka-console-producer.


Solution

  • I tried the code and found that:

    After performing the corrections the code works.