My node server receives about 400 UDP messages in one second, and it all works, and I am able to process all 400 of them.
However, when I start to receive about 700 UDP messages in one second, I lose 2-20 of the messages, and they never get parsed :(
I have thought about some options here:
Here's what my UDP sender looks like:
var dgram = require("dgram");
var udpserver = dgram.createSocket("udp4");
var seatStateStore = require("./SeatStateStore");
udpserver.on("message",
function (msg, rinfo)
{
seatStateStore.parseMessage(msg.toString());
});
Anyone have any ideas? I couldn't figure out any of the 3 options :/ Can someone help me out?
Node v0.10.29
Express v3.14.0
===============================
Here's the code I ended up using (slightly modified @RoyHB 's solution):
var dgram = require("dgram");
var udpserver = dgram.createSocket("udp4");
var seatStateStore = require("./SeatStateStore");
var Dequeue = require('dequeue');
var FIFO = new Dequeue();
fetcher();
udpserver.on("message",
function (msg, rinfo)
{
FIFO.push(msg.toString());
});
udpserver.bind(43278);
function fetcher () {
while (FIFO.length > 0)
{
var msg = FIFO.shift();
seatStateStore.parseMessage(msg);
}
setImmediate(fetcher); //make this function continuously run
}
There is a NPM module called node-dequeue. I use it a lot for similar situations to yours.
basically,
Alternatively, maybe preferably, you can use node process.nextTick to continuously check the queue for messages.
Ideally, seatStateStore.parseMessage would create a new object to asynchronously process one message so that parseMessage returns without delay while the actual message processing continues in the background. (see bottom of example code )
I haven't tested the code below, it's meant to illustrate, not to run
var FIFO = require ('dequeue');
var seatStateStore = require("./SeatStateStore");
var dgram = require("dgram");
setInterval(fetcher, 1);
var udpserver = dgram.createSocket("udp4");
udpserver.on("message",
function (msg, rinfo) {
FIFO.push(msg);
}
);
function fetcher () {
while (FIFO.length > 0) {
var msg = FIFO.shift();
seatStateStore.parseMessage(msg);
}
}
** OR (maybe better) **
var FIFO = require ('dequeue');
var seatStateStore = require("./SeatStateStore");
var dgram = require("dgram");
fetcher();
var udpserver = dgram.createSocket("udp4");
udpserver.on("message",
function (msg, rinfo) {
FIFO.push(msg);
}
);
function fetcher () {
while (FIFO.length > 0) {
var msg = FIFO.shift();
seatStateStore.parseMessage(msg);
process.nextTick(fetcher);
}
}
Outline of seatStateProcessor.parseMessage:
seatStateProcessor.parseMessage = function (msg) {
proc = new asyncProcHandler(msg, function (err) {
if (err) {
//handle the error
}
});
}