I have an edge in Kafkajs consumer, where at times I get a rebalancing error:
The group is rebalancing, so a rejoin is needed
[Connection] Response Heartbeat(key: 12, version: 3)
The group is rebalancing, so a rejoin is needed
[Runner] The group is rebalancing, re-joining
Then, once the consumer group is rebalanced, the last message that was processed is processed again, as a commit did not occur due to the error.
Kafka consumer initialzation code:
import { Consumer, Kafka } from 'kafkajs';
const kafkaInstance = new Kafka({
clientId: 'some_client_id',
brokers: ['brokers list'],
ssl: true
});
const kafkaConsumer = kafkaInstance.consumer({ groupId: 'some_consumer_group_id });
await kafkaConsumer.connect();
await kafkaConsumer.subscribe({ topic: 'some_topic', fromBeginning: true });
await kafkaConsumer.run({
autoCommit: false, // cancel auto commit in order to control committing
eachMessage: ... some processing function
});
I increased sessionTimeout
& heartbeatInteval
to higher values and different combinations, but still under heavy message load, I get the error.
I added a call to heartbeat
function inside eachMessage
function, which seems to resolve the issue.
But was wondering if it's considered as "good practice" or is there something else I can do on the consumer side in order to prevent such error?
I added a call to heartbeat
function inside of eachMessage
function, which seems to resolve the issue.