javaapache-kafkaspring-kafka

how to send batched data with Spring Kafka producer


Currently I have code like this:

KafkaTemplate<String, String> kafkaTemplate;

List<Pet> myData;

for(Pet p: myData) {
  String json = objectWriter.writeValueAsString(p)
  kafkaTemplate.send(topic, json)
}

so each list item is sent one by one. How do i send the entire list at once?


Solution

  • So there is no direct way to send the bulk messages to kafka directly using KafkaTemplate or KafkaProducer. They don't any method that accept List of objects and send them individually to different partitions.

    How do kafka producer send messages to kafka ?

    KafkaProducer

    Kafka producer create batch of records and then send these all records at once, for more information

    The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server as well as a background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster.

    The send() method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.

    Asynchronous send

    Batching is one of the big drivers of efficiency, and to enable batching the Kafka producer will attempt to accumulate data in memory and to send out larger batches in a single request. The batching can be configured to accumulate no more than a fixed number of messages and to wait no longer than some fixed latency bound (say 64k or 10 ms). This allows the accumulation of more bytes to send, and few larger I/O operations on the servers. This buffering is configurable and gives a mechanism to trade off a small amount of additional latency for better throughput.

    Since you are using spring-kafka you can send List<Objects> but here you are sending JSONArray of JSONObject instead of each JSONObject to topic partition

    public KafkaTemplate<String, List<Object>> createTemplate() {
    
            Map<String, Object> senderProps = producerProps();
            ProducerFactory<Integer, String> pf =
              new DefaultKafkaProducerFactory<String, List<Object>>(senderProps);
            KafkaTemplate<String, List<Object>> template = new KafkaTemplate<>(pf);
    return template;
    
     }
    
     public Map<String, Object> producerProps() {
    
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ProducerConfig.RETRIES_CONFIG, 0);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
            props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
           props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
           return props;
    
     }
    
    KafkaTemplate<String, List<Object>> kafkaTemplate;