apache-kafkakafkajs

In KafkaJS, do I need to pause and resume the consumer when processing a batch?


I use eachBatch() in KafkaJS to process batches of messages. Each batch is written to DB which might take a while under heavy load. I'm worried that the DB might get overloaded when multiple batches are inserted simultaneously.


Solution

  • When using eachBatch() in KafkaJS, the consumer will continue to receive batches of messages and call the eachBatch() function even if a previous batch is still being processed. So you don't need to pause and resume the consumer manually before and after processing each batch.

    However, if you're concerned about processing too many messages at once and overwhelming your application, you can use the pause() and resume() methods to control the flow of messages. For example, you can pause the consumer after processing a batch, and then resume it once the batch has been written to the database. This will prevent the consumer from receiving new batches while the previous batch is still being processed.

    Here's an example of how you can use the pause() and resume() methods:

    const { Kafka } = require('kafkajs')
    
    const kafka = new Kafka({
      clientId: 'my-app',
      brokers: ['localhost:9092']
    })
    
    const consumer = kafka.consumer({ groupId: 'my-group' })
    
    const processBatch = async ({ batch }) => {
      consumer.pause()
      await writeToDB(batch)
      consumer.resume()
    }
    
    await consumer.connect()
    await consumer.subscribe({ topic: 'my-topic', fromBeginning: true })
    
    await consumer.run({
      eachBatch: async ({ batch }) => {
        await processBatch({ batch })
      }
    })
    

    In this example, the processBatch() function pauses the consumer before writing the batch to the database, and then resumes the consumer once the batch has been written. This ensures that the consumer only receives new batches once the previous batch has been fully processed.