node.jsnode-streams

NodeJS stream ends prematurely unless `highWaterMark` value is reduced


I'm playing around with nodejs streams. Essentially, I'm reading a file, running it through some processing - in my case, making the text uppercase - and then dumping the output to the front-end.

On the front-end, I have an EventSource that establishes a connection to the endpoint and gets data.

The only "catch" is that I'm trying to simulate a delay in the transformation logic.

const express = require('express');
const app = express();
const port = 3010;
const path = require('path');

const { Transform } = require('node:stream');
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');

app.use(express.static('static'));

app.get('/', (req, res) => {
  res.sendFile(path.resolve('pages/index.html'));
});

const processChunk = async function (chunk) {
  // simulate delay
  await new Promise((resolve, reject) => {
    setTimeout(resolve, 50);
  });
  return chunk.toUpperCase();
};

app.get('/hello', async (req, res) => {
  res.type('text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');

  const readStream = fs.createReadStream('./sample.txt', {
    highWaterMark: 5 * 1024, // does not work unless this value is changed to 2.5 * 1024
  });

  async function* processStream(source, { signal }) {
    source.setEncoding('utf-8');
    for await (const chunk of source) {
      yield processChunk(chunk);
    }
  }

  const transformStream = new Transform({
    async transform(chunk, encoding, callback) {
      const s = chunk.toString();
      res.write(`data: ${s}\n\n`);
      callback();
    },
  });

  transformStream.on('end', () => {
    console.log('ending');
    res.end();
  });

  await pipeline(readStream, processStream, transformStream);
});

app.listen(port, () => {
  console.log(`Example app listening at http://localhost:${port}`);
});

I have a 20kb file; now, all of this works as long as the highWaterMark value does not exceed 2.5kb.

StackBlitz


Solution

  • So highWaterMark sets the threshold on how much data should be streamed in memory till it is paused while the buffer empties. So basically you can view your servers buffer as a pipeline between the file and frontend, and that pipeline can only contain so much water. The purpose of highWaterMark is to tell your program how much data it can handle before it needs to wait for the pipeline to empty.

    So what is happening is that your server cant handle 5 * 1024 worth of data, but it can handle batches of 2.5*1024 worth of data. Basically, without an appropriate highWater Mark you end up with crashes, high memory usage, and poor garbage collector performance. You can checkout backpressure as well for a resource.

    You could keep that number as is by increasing the resources available to your server.