scalaakka-stream

Create Flow[ByteString, ByteString, NotUsed] by piping through InputStream


I need decompress data using compression not supported by akka, but supported by other library which provided InputStream interface. To make it work with akka stream, I need to implement function:

def pipeThroughInputStream(pipeThrough: InputStream => InputStream): Flow[ByteString, ByteString, NotUsed]

But I'm not sure how to do it. I'm aware about conversion functions like StreamConverters.asInputStream and StreamConverters.fromInputStream but I don't know how to apply them here. All I have come up with so far is

  def pipeThroughInputStream(pipeThrough: InputStream => InputStream): Flow[ByteString, ByteString, NotUsed] = {
    val sink: Sink[ByteString, Source[ByteString, Future[IOResult]]] = StreamConverters.asInputStream().mapMaterializedValue { materializedInputStream =>
      val inputStream = pipeThrough(materializedInputStream)
      StreamConverters.fromInputStream(() => inputStream)
    }
    ???
  }

But I don't know how now convert this Sink that materializes to Source back to Flow.


Solution

  • What you'll want is something along the lines of this:

    def pipeThroughInputStream(pipeThrough: InputStream => InputStream)(implicit mat: Materializer): Flow[ByteString, ByteString, NotUsed] = {
      val sink = StreamConverters.asInputStream().mapMaterializedValue { materializedInputStream =>
        val inputStream = pipeThrough(materializedInputStream)
        StreamConverters.fromInputStream(() => inputStream)
      }
      val (source, matSink) = sink.preMaterialize()
      Flow.fromSinkAndSource(matSink, source)
    }
    

    Pre-materializing lets us get at the source we're materializing. We then use fromSinkAndSource to create a flow which uses the sink as its sink side and the source as its source side. In this case, they are decoupled: the downstream canceling will not stop the upstream (this may not be a problem with a finite source or if there's some means to stop the upstream like a KillSwitch)... fromSinkAndSourceCoupled will couple these, but that will also cause the downstream to complete when the upstream completes, potentially before what happens in "InputStreamland" is done. If one wanted something more aware of the state of the InputStream, one could implement a custom stage with the desired reactive stream semantics.

    Note that pre-materializing will do all the InputStream wiring before the rest of the stream materializes (i.e. when you're building the "blueprint" for the stream). If that's undesirable, you can be a little more obscure by replacing the last pair of lines with something like:

    Flow.lazyFlow { () =>
      Flow.fromSinkAndSourceCoupled(sink, sink.preMaterialize())
    }
    

    This will delay doing the InputStream wiring until there is demand from downstream.