node.jsstreampass-through

Nodejs PassThrough Stream


I want to transmit an fs.Readstream over a net.Socket (TCP) stream. For this I use a .pipe. When the fs.Readstream is finished, I don't want to end the net.Socket stream. That's why I use

readStream.pipe(socket, { end: false })

Unfortunately I don't get 'close', 'finish' or 'end' on the other side. This prevents me from closing my fs.Writestream on the opposite side. However, the net.Socket connection remains, which I also need because I would like to receive an ID as a response. Since I don't get a 'close' or 'finish' on the opposite, unfortunately I can't end the fs.Writestream and therefore can't send a response with a corresponding ID

Is there a way to manually send a 'close' or 'finish' event via the net.socket without closing it? With the command, only my own events react. Can anyone tell me what I am doing wrong?

    var socket : net.Socket; //TCP connect
    var readStream = fs.createWriteStream('test.txt');

    socket.on('connect', () => {
        readStream.pipe(socket, {
            end: false
        })
        readStream.on('close', () => {
            socket.emit('close');
            socket.emit('finish');
        })

        //waiting for answer
        //waiting for answer
        //waiting for answer

        socket.on('data', (c) => {
            console.log('got my answer: ' + c.toString());
        })
    })
}

Solution

  • Well there's not really much you can do with a single stream except provide some way to the other side to know that the stream has ended programatically.

    When the socket sends an end event it actually flushes the buffer and then closes the TCP connection, which then on the other side is translated into finish after the last byte is delivered. In order to re-use the connection you can consider these two options:

    One: Use HTTP keep-alive

    As you can imagine you're not the first person having faced this problem. It actually is a common thing and some protocols like HTTP have you already covered. This will introduce a minor overhead, but only on starting and ending the streams - which in your case may be more acceptable than the other options.

    Instead of using basic TCP streams you can as simply use HTTP connections and send your data over http requests, a HTTP POST request would be just fine and your code wouldn't look any different except ditching that {end: false}. The socket would need to have it's headers sent, so it'd be constructed like this:

    const socket : HTTP.ClientRequest = http.request({method: 'POST', url: '//wherever.org/somewhere/there:9087', headers: {
       'connection': 'keep-alive',
       'transfer-encoding': 'chunked'
    }}, (res) => {
       // here you can call the code to push more streams since the 
    });
    
    readStream.pipe(socket); // so our socket (vel connection) will end, but the underlying channel will stay open.
    

    You actually don't need to wait for the socket to connect, and pipe the stream directly like in the example above, but do check how this behaves if your connection fails. Your waiting for connect event will also work since HTTP request class implements all TCP connection events and methods (although it may have some slight differences in signatures).

    More reading:

    Oh and a bit of warning - TCP keep-alive is a different thing, so don't get confused there.

    Two: Use a "magic" end packet

    In this case what you'd do is to send a simple end packet, for instance: \x00 (a nul character) at the end of the socket. This has a major drawback, because you will need to do something with the stream in order to make sure that a nul character doesn't appear there otherwise - this will introduce an overhead on the data processing (so more CPU usage).

    In order to do it like this, you need to push the data through a transform stream before you send them to the socket - this below is an example, but it would work on strings only so adapt it to your needs.

    const zeroEncoder = new Transform({
      encoding: 'utf-8',
      transform(chunk, enc, cb) { cb(chunk.toString().replace('\x00', '\\x00')); },
      flush: (cb) => cb('\x00')
    });
    
    // ... whereever you do the writing:
    
    readStream
      .pipe(zeroEncoder)
      .on('unpipe', () => console.log('this will be your end marker to send in another stream'))
      .pipe(socket, {end: false})
    

    Then on the other side:

    tcpStream.on('data', (chunk) => {
      if (chunk.toString().endsWith('\x00')) {
        output.end(decodeZeros(chunk));
        // and rotate output
      } else {
        output.write(decodeZeros(chunk));
      }
    });
    

    As you can see this is way more complicated and this is also just an example - you could simplify it a bit by using JSON, 7-bit transfer encoding or some other ways, but it will in all cases need some trickery and most importantly reading through the whole stream and way more memory for it - so I don't really recommend this approach. If you do though:

    I hope this is helpful.