node.jsmiddlewarenats.io

The NATS client for NodeJS is very slow


I performed a latency test with the NATS benchmark tool, using two types of NATS clients.

A) The native NATS client (NATS CLI), which can be downloaded and installed from https://github.com/nats-io/natscli/releases

Procedure

Producer command : nats bench foo --pub 1 --request --msgs 100000 --size 3kb

Consumer command : nats bench foo --sub 1 --reply

Version : 0.0.35

Results : ~6600 Msg/s

Latency : 1/(2*6600) = 0.075 ms

NATS CLI Latency Test Result

B) Then I used the NATS client for NodeJS, writing a script inspired by what's provided in the NATS NodeJS Github repository, to do the same benchmark as with the native NATS client.

bench.js

#!/usr/bin/env node

const parse = require("minimist");
const {Nuid, connect, Bench, Metric} = require('nats')

const defaults = {
  s: "127.0.0.1:4222",
  c: 100000,
  p: 128,
  subject: new Nuid().next(),
  i: 1,
  json: false,
  csv: false,
  csvheader: false,
  pending: 1024 * 32,
};

const argv = parse(
  process.argv.slice(2),
  {
    alias: {
      "s": ["server"],
      "c": ["count"],
      "d": ["debug"],
      "p": ["payload"],
      "i": ["iterations"],
    },
    default: defaults,
    string: [
      "subject",
    ],
    boolean: [
      "asyncRequests",
      "callbacks",
      "json",
      "csv",
      "csvheader",
    ],
  },
);

if (argv.h || argv.help || (!argv.sub && !argv.pub && !argv.req)) {
  console.log(
    "usage: bench.ts [--json] [--callbacks] [--csv] [--csvheader] [--pub] [--sub] [--req (--asyncRequests)] [--count <#messages>=100000] [--payload <#bytes>=128] [--iterations <#loop>=1>] [--server server] [--subject <subj>]\n",
  );
  process.exit(0);
}
const server = argv.server;
const count = parseInt(argv.count);
const bytes = parseInt(argv.payload);
const iters = parseInt(argv.iterations);
const pl = parseInt(argv.pendingLimit) * 1024;
const metrics = [];

(async () => {
  for (let i = 0; i < iters; i++) {
    const nc = await connect(
      { servers: server, debug: argv.debug, pending: argv.pending },
    );
    const opts = {
      msgs: count,
      size: bytes,
      asyncRequests: argv.asyncRequests,
      callbacks: argv.callbacks,
      pub: argv.pub,
      sub: argv.sub,
      req: argv.req,
      rep: argv.rep,
      subject: argv.subject,
    };

    const bench = new Bench(nc, opts);
    const m = await bench.run();
    metrics.push(...m);
    await nc.close();
  }
})().then(() => {
  const reducer = (a, m) => {
    if (a) {
      a.name = m.name;
      a.payload = m.payload;
      a.bytes += m.bytes;
      a.duration += m.duration;
      a.msgs += m.msgs;
      a.lang = m.lang;
      a.version = m.version;
      a.async = m.async;

      a.max = Math.max(a.max === undefined ? 0 : a.max, m.duration);
      a.min = Math.min(a.min === undefined ? m.duration : a.max, m.duration);
    }
    return a;
  };

  if (!argv.json && !argv.csv) {
    const pubsub = metrics.filter((m) => m.name === "pubsub").reduce(
      reducer,
      new Metric("pubsub", 0),
    );
    const pub = metrics.filter((m) => m.name === "pub").reduce(
      reducer,
      new Metric("pub", 0),
    );
    const sub = metrics.filter((m) => m.name === "sub").reduce(
      reducer,
      new Metric("sub", 0),
    );
    const req = metrics.filter((m) => m.name === "req").reduce(
      reducer,
      new Metric("req", 0),
    );
    const rep = metrics.filter((m) => m.name === "rep").reduce(
      reducer,
      new Metric("rep", 0),
    );

    if (pubsub && pubsub.msgs) {
      console.log(pubsub.toString());
    }
    if (pub && pub.msgs) {
      console.log(pub.toString());
    }
    if (sub && sub.msgs) {
      console.log(sub.toString());
    }
    if (req && req.msgs) {
      console.log(req.toString());
    }
    if (rep && rep.msgs) {
      console.log(rep.toString());
    }
  } else if (argv.json) {
    console.log(JSON.stringify(metrics, null, 2));
  } else if (argv.csv) {
    const lines = metrics.map((m) => {
      return m.toCsv();
    });
    if (argv.csvheader) {
      lines.unshift(Metric.header());
    }
    console.log(lines.join(""));
  }
});

Procedure

Command: node bench.js --subject test --req --payload 3500 --count 100000

NodeJS Client Version: 2.1.14

Results : ~330 Msg/s

Latency : 1/(2*330) = 1.5 ms

NATS NodeJS Latency Test Result

C) In addition, I tried to create two clients, a publisher which sends messages and a subscriber which consumes them, to measure the transport time of the messages in question, without going through the benchmark tool. The results are of the same order as the latency time calculated by the benchmark tool, bench.js.

pub.js

#!/usr/bin/env node

const parse = require("minimist");
const { connect, StringCodec, headers, credsAuthenticator } = require("nats");
const { delay } = require("./utils.js");
const fs = require("fs");

const argv = parse(
  process.argv.slice(2),
  {
    alias: {
      "s": ["server"],
      "c": ["count"],
      "i": ["interval"],
      "f": ["creds"],
      "h": ["headers"]
    },
    default: {
      s: "127.0.0.1:4222",
      c: 1,
      i: 0,
    },
    boolean: true,
    string: ["server", "count", "interval", "headers", "creds"],
  },
);

const opts = { servers: argv.s };
const subject = String(argv._[0]);
const payload = argv._[1] || "";
const count = (argv.c === -1 ? Number.MAX_SAFE_INTEGER : argv.c) || 1;
const interval = argv.i || 0;

if (argv.h || argv.help || !subject) {
    console.log(
      "Usage: nats-pub [--creds=/path/file.creds] [-s server] [-c <count>=1] [-i <interval>=0] [-h='k=v;k2=v2'] subject [msg]",
    );
    console.log("to publish forever, specify -c=-1 or --count=-1");
    process.exit(1);
  }

if (argv.debug) {
  opts.debug = true;
}

if (argv.creds) {
  const data = fs.readFileSync(argv.creds);
  opts.authenticator = credsAuthenticator(data);
}

(async () => {
  let nc;
  try {
    nc = await connect(opts);
  } catch (err) {
    console.log(`error connecting to nats: ${err.message}`);
    return;
  }
  console.info(`connected ${nc.getServer()}`);

  nc.closed()
    .then((err) => {
      if (err) {
        console.error(`closed with an error: ${err.message}`);
      }
    });

  const hdrs = headers();
  if (argv.headers) {
    argv.headers.split(";").map((l) => {
      const [k, v] = l.split("=");
      hdrs.append(k, v);
    });
  }
  hdrs.append("time", "0");

  const sc = StringCodec();

  for (let i = 1; i <= count; i++) {
    const pubopts = {};
    hdrs.set("time", Date.now().toString());
    pubopts.headers = hdrs;
    nc.publish(subject, sc.encode(String(payload)), pubopts);
    console.log(`[${i}] ${subject}: ${payload}`);
    if (interval) {
      await delay(interval);
    }
  }
  await nc.flush();
  await nc.close();
})();

sub.js

#!/usr/bin/env node

const parse = require("minimist");
const { connect, StringCodec, credsAuthenticator } = require('nats');
const fs = require("fs");

const argv = parse(
  process.argv.slice(2),
  {
    alias: {
      "d": ["debug"],
      "s": ["server"],
      "q": ["queue"],
      "f": ["creds"]
    },
    default: {
      s: "127.0.0.1:4222",
      q: "",
    },
    boolean: ["debug"],
    string: ["server", "queue", "creds"]
  }
);

const usage = () => {
    console.log(
        "Usage: nats-sub [-s server] [--creds=/path/file.creds] [-q queue] [-h] subject",
    );
}

const opts = { servers: argv.s };
const subject = argv._[0] ? String(argv._[0]) : null;

if (argv.h || argv.help || !subject) {
usage();
process.exit(1);
}
  
if (argv.d) {
  opts.debug = true;
}

if (argv.creds) {
  const data = fs.readFileSync(argv.creds);
  opts.authenticator = credsAuthenticator(data);
}


(async () => {
  let nc;
  try {
    nc = await connect(opts);
  } catch (err) {
    console.log(`error connecting to nats: ${err.message}`);
    return;
  }

  console.info(`connected ${nc.getServer()}`);
  nc.closed()
    .then((err) => {
        if (err) {
            console.error(`closed with an error: ${err.message}`);
        }
    });

  const sc = StringCodec();
  const sub = nc.subscribe(subject, { queue: argv.q });
  console.info(`${argv.q !== "" ? "queue " : ""}listening to ${subject}`);
  for await (const m of sub) {
    let receivedTime = Date.now();
    let transportTime = null;
    if (m.headers) {
        for (const [key, value] of m.headers) {
            if (key === 'time') {
                transportTime = receivedTime - parseInt(value.toString(), 10);
                break;
            }
        }
    }
    console.log(`[${sub.getProcessed()}]: ${m.subject}: ${sc.decode(m.data)} after ${(transportTime || -1)} ms`);
  }
})();

Procedure

Command pub : node pub.js -c 50 -i 10 test --debug

Command sub : node sub.js --debug test

Version : 2.1.14

Transport time : ~2.5ms

Transport Time NODEJS

My question is: why this glaring difference between NATS CLI and NodeJS clients? Is it due to the language? NATS being done in Go natively?


Solution

  • The short answer is the Go client (one used in nats tool) will be much faster (client can use different threads to send/receive and process network traffic)

    In node/and other JavaScript runtimes while the client is doing something, it cannot do something else. This means that if it is busy doing i/o, it cannot process subscription messages, etc.

    The i/o of a test involving a publisher can be affected by the size of the outbound buffer. By default this is 32K in the javascript clients is a good working size (I believe there's an option to change it in the test, if not I will update the bench). When publishing in a loop once the outbound is filled, it sends to the network - typically the client will process and do nothing, and it flushes on the next event loop. In the case of a test loop, it fills it, and must send, and then continues. So the size of this buffer greatly affects the network i/o.

    In your screenshots, you are using debug. Also never put the server or client in debug modes when performing any performance tests.

    Request Test

    There are a few things you should try:

    node examples/bench.js --subject a --req --payload 3500 --count 10000
    req 791 msgs/sec - [25.30 secs] ~ 2.64 MB/sec 25295/25295
    
    node examples/bench.js --subject a --req --payload 3500 --count 10000 --asyncRequests
    req 36,430 msgs/sec - [0.55 secs] ~ 243.20 MB/sec 549/549
    
    node examples/bench.js --subject a --req --payload 3500 --count 10000 --asyncRequests --callbacks
    req 38,462 msgs/sec - [0.52 secs] ~ 256.76 MB/sec 520/520
    

    Pub test

    Raw blast performance of the node client:

    node examples/bench.js --subject a --pub --payload 0
    pub 917,431 msgs/sec - [0.11 secs] ~ 0.00 B/sec 109/109
    
    # If you actually add a payload it will take longer (after all you are moving 3.5gb):
    
    node examples/bench.js --subject a --pub --payload 3500
    pub 290,698 msgs/sec - [0.34 secs] ~ 970.31 MB/sec 344/344
    

    Sub test

    node examples/bench.js --subject a --sub
    sub 294,118 msgs/sec - [0.34 secs] ~ 981.72 MB/sec 340/340
    
    # and in a different terminal - this will possibly 
    nats bench --pub 1 --msgs 1000000 --size 3500 -s localhost:4222 a
    11:48:16 Starting Core NATS pub/sub benchmark [subject=a, multisubject=false, multisubjectmax=0, msgs=1,000,000, msgsize=3.4 KiB, pubs=1, subs=0, pubsleep=0s, subsleep=0s]
    11:48:16 Starting publisher, publishing 1,000,000 messages
    Finished      1s
    
    Pub stats: 608,014 msgs/sec ~ 1.98 GB/sec
    

    Specifying the --callbacks option will increase the performance Also, the publisher implementation will limit how fast the sub will perform so:

    node examples/bench.js --subject a --sub --callbacks
    sub 370,370 msgs/sec - [0.27 secs] ~ 1.21 GB/sec 270/270
    

    [repeat the nats bench above]

    If doing both, the numbers will be a bit different, mostly because of JavaScript either publishing or processing

    node examples/bench.js --subject a --sub --callbacks --pub
    pubsub 930,233 msgs/sec - [0.21 secs] ~ 113.55 MB/sec 215/215
    pub 462,963 msgs/sec - [0.22 secs] ~ 56.51 MB/sec 216/216
    sub 952,381 msgs/sec - [0.10 secs] ~ 116.26 MB/sec 105/105