javascriptnode.jssocketsexpressudp

Node.js: Receiving too many UDP messages at a time, losing them


My node server receives about 400 UDP messages in one second, and it all works, and I am able to process all 400 of them.

However, when I start to receive about 700 UDP messages in one second, I lose 2-20 of the messages, and they never get parsed :(

I have thought about some options here:

  1. Create a queue of all the socket messages, then consume one-by-one, although I'm not sure how to implement this
    • Can't figure out how to implement
  2. Find a setting in Node / Express / dgram socket where i can increase the memory size / buffer size, something like that
    • I couldn't find any settings like this, though :(
  3. Use a different UDP receiver, stop using node's build in socket UDP receiver
    • Didn't find other receivers

Here's what my UDP sender looks like:

var dgram = require("dgram");
var udpserver = dgram.createSocket("udp4");
var seatStateStore = require("./SeatStateStore");

udpserver.on("message",
        function (msg, rinfo)
        {
        seatStateStore.parseMessage(msg.toString());
    });

Anyone have any ideas? I couldn't figure out any of the 3 options :/ Can someone help me out?

Node v0.10.29

Express v3.14.0

===============================

UPDATE / SOLUTION

Here's the code I ended up using (slightly modified @RoyHB 's solution):

var dgram = require("dgram");
var udpserver = dgram.createSocket("udp4");
var seatStateStore = require("./SeatStateStore");
var Dequeue = require('dequeue');
var FIFO = new Dequeue();

fetcher();

udpserver.on("message",
        function (msg, rinfo)
        {
           FIFO.push(msg.toString());
        });

udpserver.bind(43278);

function fetcher () {
    while (FIFO.length > 0) 
    {
        var msg = FIFO.shift();
        seatStateStore.parseMessage(msg);
    }
    setImmediate(fetcher); //make this function continuously run
}

Solution

  • There is a NPM module called node-dequeue. I use it a lot for similar situations to yours.

    basically,

    1. your program pushes received messages onto the end of the queue.
    2. an interval timer periodically activates another method or function ( a queue-fetcher) which checks to see if there are messages on the queue and if so, fetches one or more and processes it.
    3. Alternatively (maybe better) no timer is used to schedule queue fetches. Instead the node process.nextTick method is used.

    Alternatively, maybe preferably, you can use node process.nextTick to continuously check the queue for messages.

    Ideally, seatStateStore.parseMessage would create a new object to asynchronously process one message so that parseMessage returns without delay while the actual message processing continues in the background. (see bottom of example code )

    I haven't tested the code below, it's meant to illustrate, not to run

    var FIFO = require ('dequeue');
    var seatStateStore = require("./SeatStateStore");
    var dgram = require("dgram");
    
    setInterval(fetcher, 1);
    
    var udpserver = dgram.createSocket("udp4");
    
    udpserver.on("message",
        function (msg, rinfo) {
            FIFO.push(msg);
        }
    );
    
    function fetcher () {
        while (FIFO.length > 0) {
            var msg = FIFO.shift();
            seatStateStore.parseMessage(msg);
        }
    }
    

    ** OR (maybe better) **

    var FIFO = require ('dequeue');
    var seatStateStore = require("./SeatStateStore");
    var dgram = require("dgram");
    
    fetcher();
    
    var udpserver = dgram.createSocket("udp4");
    
    udpserver.on("message",
        function (msg, rinfo) {
            FIFO.push(msg);
        }
    );
    
    function fetcher () {
        while (FIFO.length > 0) {
            var msg = FIFO.shift();
            seatStateStore.parseMessage(msg);
            process.nextTick(fetcher);
        }
    }
    

    Outline of seatStateProcessor.parseMessage:

    seatStateProcessor.parseMessage = function (msg) {
        proc = new asyncProcHandler(msg, function (err) {
            if (err) {
                //handle the error
            }
        });
    }