node.jstypescriptrxjstcpserver

Observable recurses on itself, and I can't find out why


I'm a newbie with the rxjs library, still learning it, and I'm trying to implement a TCP server in node.js using Observable, and using a pattern I saw at my company that consists of having functions that takes an Observable as input and return another output Observable that will be sent back to the client, so I came up with this code

import net from "net";
import * as rx from "rxjs";

type MaybePromise<T> = T | PromiseLike<T>;
type ConnectionHandler = (address: net.AddressInfo, input$: rx.Observable<Buffer>) => MaybePromise<rx.Observable<Buffer>>;

const createServer = (port: number, handler: ConnectionHandler, opts?: net.ServerOpts) => {
  const server = net.createServer(opts);

  new rx.Observable<net.Socket>(subscriber => {
    server
      .on("connection", socket => subscriber.next(socket))
      .on("error", err => subscriber.error(err));
  }).subscribe(async socket =>
    (await handler(
      socket.address() as net.SocketAddress,
      new rx.Observable<Buffer>(subscriber => {
        socket
          .on("end", () => socket.destroySoon())
          .on("error", err => subscriber.error(err))
          .on("data", request => subscriber.next(request))
          .on("close", () => subscriber.complete());
      }),
    )).subscribe({
      next: response => {
        console.log(response.toString());
        socket.emit("data", response);
      },
      error: () => socket.destroySoon(),
    })
  );

  server.listen(port);
};

createServer(3000, async (address, input$) => {
  console.log(`New client: ${address.address}`);

  const login = await rx.firstValueFrom(input$);

  if (login.toString() != "login\n")
    return rx.throwError(() => new Error("login failure"));

  return input$.pipe(
    rx.map(data => {
      return Buffer.from(`ACK: ${data.toString()}`, "utf-8");
    })
  );
});

So basically in this example I want to wait for a login\n request for the client, and then for each request, I want to echo it back to the client, adding ACK: in front of it, but when creating my output observable it adds ACK thousands of times on the same massage, so it seems like the output is coming back to the function as input each time, but I don't know why, I'm not changing the input Observable, I'm just returning a new one that will be processed to be sent to the client, can you help me point out where the mistake comes from ?


Solution

  • You both have:

    socket
    //[...]
    .on("data", request => subscriber.next(request))
    

    And:

    // [...]
    .subscribe({
          next: response => {
            // [...]
            socket.emit("data", response);
          },
    })
    

    I'm guessing this is what causes the infinite loop.

    You'll want to replace socket.emit("data", response) with socket.write(response).

    Have a look at the docs for Socket#write.