I have a node js service that consumes messages from Kafka and processes it through various steps of transformation logic. During the processing, services use Redis and mongo for storage and caching purposes. In the end, it sends the transformed message to another destination via UDP packets.
On startup, it starts consuming message from Kafka after a while, it crashes down with the unhandled error: ERR_CANNOT_SEND unable to send data(see below picture). restarting the application resolves the issue temporarily. I initially thought it might have to do with the forwarding through UDP sockets, but the forwarding destinations are reachable from the consumer!
I'd appreciate any help here. I'm kinda stuck here.
Consumer code:
const readFromKafka = ({host, topic, source}, transformationService) => {
const logger = createChildLogger(`kafka-consumer-${topic}`);
const options = {
// connect directly to kafka broker (instantiates a KafkaClient)
kafkaHost: host,
groupId: `${topic}-group`,
protocol: ['roundrobin'], // and so on the other kafka config.
};
logger.info(`starting kafka consumer on ${host} for ${topic}`);
const consumer = new ConsumerGroup(options, [topic]);
consumer.on('error', (err) => logger.error(err));
consumer.on('message', async ({value, offset}) => {
logger.info(`recieved ${topic}`, value);
if (value) {
const final = await transformationService([
JSON.parse(Buffer.from(value, 'binary').toString()),
]);
logger.info('Message recieved', {instanceID: final[0].instanceId, trace: final[1]});
} else {
logger.error(`invalid message: ${topic} ${value}`);
}
return;
});
consumer.on('rebalanced', () => {
logger.info('cosumer is rebalancing');
});
return consumer;
};
Consumer Service startup and error handling code:
//init is the async function used to initialise the cache and other config and components.
const init = async() =>{
//initialize cache, configs.
}
//startConsumer is the async function that connects to Kafka,
//and add a callback for the onMessage listener which processes the message through the transformation service.
const startConsumer = async ({ ...config}) => {
//calls to fetch info like topic, transformationService etc.
//readFromKafka function defn pasted above
readFromKafka( {topicConfig}, transformationService);
};
init()
.then(startConsumer)
.catch((err) => {
logger.error(err);
});
Forwarding code through UDP sockets. Following code throws the unhandled error intermittently as this seemed to work for the first few thousands of messages, and then suddenly it crashes
const udpSender = (msg, destinations) => {
return Object.values(destinations)
.map(({id, host, port}) => {
return new Promise((resolve) => {
dgram.createSocket('udp4').send(msg, 0, msg.length, port, host, (err) => {
resolve({
id,
timestamp: Date.now(),
logs: err || 'Sent succesfully',
});
});
});
});
};
Based on our comment exchange, I believe the issue is just that you're running out of resources.
Throughout the lifetime of your app, every time you send a message you open up a brand new socket. However, you're not doing any cleanup after sending that message, and so that socket stays open indefinitely. Your open sockets then continue to pile up, consuming resources, until you eventually run out of... something. Perhaps memory, perhaps ports, perhaps something else, but ultimately your app crashes.
Luckily, the solution isn't too convoluted: just reuse existing sockets. In fact, you can just reuse one socket for the entirety of the application if you wanted, as internally socket.send
handles queueing for you, so no need to do any smart hand-offs. However, if you wanted a little more concurrency, here's a quick implementation of a round-robin queue where we've created a pool of 10 sockets in advance which we just grab from whenever we want to send a message:
const MAX_CONCURRENT_SOCKETS = 10;
var rrIndex = 0;
const rrSocketPool = (() => {
var arr = [];
for (let i = 0; i < MAX_CONCURRENT_SOCKETS; i++) {
let sock = dgram.createSocket('udp4');
arr.push(sock);
}
return arr;
})();
const udpSender = (msg, destinations) => {
return Object.values(destinations)
.map(({ id, host, port }) => {
return new Promise((resolve) => {
var sock = rrSocketPool[rrIndex];
rrIndex = (rrIndex + 1) % MAX_CONCURRENT_SOCKETS;
sock.send(msg, 0, msg.length, port, host, (err) => {
resolve({
id,
timestamp: Date.now(),
logs: err || 'Sent succesfully',
});
});
});
});
};
Be aware that this implementation is still naïve for a few reasons, mostly because there's still no error handling on the sockets themselves, only on their .send
method. You should look at the docs for more info about catching events such as error
events, especially if this is a production server that's supposed to run indefinitely, but basically the error-handling you've put inside your .send
callback will only work... if an error occurs in a call to .send
. If between sending messages, while your sockets are idle, some system-level error outside of your control occurs and causes your sockets to break, your socket may then emit an error event, which will go unhandled (like what's happening in your current implementation, with the intermittent errors that you see prior to the fatal one). At that point they may now be permanently unusable, meaning they should be replaced/reinstated or otherwise dealt with (or alternatively, just force the app to restart and call it a day, like I do :-) ).