scalaakkaproject-reactorrsocketpekko

Is there a way to create a RSocket "forRequestStream" and return it as a Pekko/Akka Sink without using the deprecated FluxProcessor?


In my use case, I need to use the RSocket protocol to create a reactive stream over the network, and on the client side return a pekko Source, and on the server side return a pekko Sink. I have no problems on the client side because creating the connection returns a Flux, which can be trivially converted to a pekko Source with Source.fromPublisher.

On the server side, RSocket expects a Flux as input, but I have other code that is expecting a pekko Sink as a result (it can't take a ManyWithUpstream unless I can convert that to a Sink).

Here's a minimal code example that works flawlessly for my use case but relies on the deprecated EmitterProcessor.

def serverSink : Sink[Payload, NotUsed] = {
    val processor : Processor[Payload, Payload] = EmitterProcessor.create(1)
    RSocketServer.create(
      SocketAcceptor.forRequestStream(payload =>
        Flux.from(processor)
    )).bindNow(TcpServerTransport.create("localhost", 3141))
    Sink.fromSubscriber(processor)
  }

I have tried the following but when I try to run it nothing fed to the Sink will be actually fed into RSocket for reasons I don't completely understand.

def serverSink : Sink[Payload, NotUsed] = {
    val processor : Processor[Payload, Payload] = Flow[Payload].toProcessor.run()
    RSocketServer.create(
      SocketAcceptor.forRequestStream(payload =>
        Flux.from(processor)
    )).bindNow(TcpServerTransport.create("localhost", 3141))
    Sink.fromSubscriber(processor)
  }

Solution

  • So, I have a solution, but it's a little hacky. For reasons I don't understand, the publisher from pekko needs a subscriber other than the ones over the ones from RSocket. This works, but with the caveat that the sink will consume everything it can if there are no connected clients (which is actually preferred in my use case)

    def serverSink : Sink[Payload, NotUsed] = {
        val sink : Sink[Payload, Publisher[Payload]] = Sink.asPublisher(true)
        sink.mapMaterializedValue { pub =>
          pub.subscribe(new Subscriber[Payload] {
            override def onComplete(): Unit = ()
            override def onError(t: Throwable): Unit = ()
            override def onNext(t: Payload): Unit = ()
            override def onSubscribe(s: Subscription): Unit = s.request(Long.MaxValue)
          })
          RSocketServer.create(
            SocketAcceptor.forRequestStream(payload =>
              Flux.from(pub)
          )).bindNow(TcpServerTransport.create("localhost", 3141))
          NotUsed.notUsed()
        }
      }