javascriptnode.jsrxjsrxjs5

How to convert node readable stream to RX observable


If I have a Node js stream, say for example from something like process.stdin or from fs.createReadStream, how can I convert this to be an RxJs Observable stream using RxJs5?

I see that RxJs-Node has a fromReadableStream method, but that looks like it hasn't been updated in close to a year.


Solution

  • For anyone looking for this, following Mark's recommendation, I adapted rx-node fromStream implementation for rxjs5.

    import { Observable } from 'rxjs';
    
    // Adapted from https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52
    export default function fromStream(stream, finishEventName = 'end', dataEventName = 'data') {
      stream.pause();
    
      return new Observable((observer) => {
        function dataHandler(data) {
          observer.next(data);
        }
    
        function errorHandler(err) {
          observer.error(err);
        }
    
        function endHandler() {
          observer.complete();
        }
    
        stream.addListener(dataEventName, dataHandler);
        stream.addListener('error', errorHandler);
        stream.addListener(finishEventName, endHandler);
    
        stream.resume();
    
        return () => {
          stream.removeListener(dataEventName, dataHandler);
          stream.removeListener('error', errorHandler);
          stream.removeListener(finishEventName, endHandler);
        };
      }).share();
    }
    

    Note that it intrinsically breaks all back pressure functionalities of streams. Observables' are a push technology. All input chunks are going to be read and pushed to the observer as quickly as possible. Depending on your case, it might not be the best solution.