javascriptnode.jsnode-streams

Node Stream API, pull function equivalent


I have earlier used the "Web Stream" API in order to continuously read items from a REST service and enqueue them in the stream according to something like this:

stream = new ReadableStream({
   start: controller => {
      //do some initialization
   },
   pull: controller => {
      fetch('http://restservice/to/fetch/items')
      .then(response => response.json())
      .then(item => controller.enqueue(item));
   }
}) 

Whenever I read from this stream with

stream.getReader().read().then(item => {
  console.log(item);
});

the pull function automatically fetches the next item and enqueues it.

I now need to use the same pattern for Node's native "Stream" API, but I cant figure out how to implement it. There is no "pull" equivalent it seems. How do I keep pushing new items automatically onto the stream whenever I read from it?


Solution

  • Node.js Stream API works differently from the browser's ReadableStream. You usually make a custom Readable stream and manage data pushing manually.

    const { Readable } = require('stream');
    const fetch = require('node-fetch');
    
    class CustomReadableStream extends Readable {
      constructor(options) {
        super(options);
        // do initialization here if needed
      }
    
      _read() {
        // Fetch data from the REST service
        fetch('http://restservice/to/fetch/items')
          .then(response => response.json())
          .then(item => {
            // Push the fetched item into the stream
            this.push(JSON.stringify(item));
          })
          .catch(err => {
            // Handle errors appropriately
            this.emit('error', err);
          });
      }
    }
    
    // Create an instance of your custom stream
    const customStream = new CustomReadableStream();
    
    // Read from the stream
    customStream.on('data', chunk => {
      console.log('Received:', chunk.toString());
    });
    
    // Handle stream errors
    customStream.on('error', err => {
      console.error('Stream error:', err);
    });

    I extended Node.js's Readable class. In the _read method, you get data from your REST service and push it into the stream using this.push(). When you read from this stream (customStream), it will automatically fetch items and send them as chunks through the 'data' event.

    Make sure to handle errors properly by emitting the 'error' event if something goes wrong in the _read method.