scalaakkaakka-stream

Fetch Source[ByteString, Any] out of stream into 2 sinks


I am trying to put incoming Source[ByteString, Any] into 2 sinks & trying to duplicate the incoming stream in akka streaming graphs. I am getting desired 'is' as input stream but bs in not of type Source[ByteString, Any]. I am getting boxed error.

private def duplicateStream(content: Source[ByteString, Any]): Future[Either[X, Y]] = {
val sink = StreamConverters.asInputStream()
    val (is, bs): (InputStream, Future[ByteString]) = content
      .alsoToMat(sink)(Keep.right)
      .toMat(Sink.last)(Keep.both)
      .run()

//is is input stream which is desired
//bs should be Source[ByteString, Any]
}

How can I get bs asSource[ByteString, Any] out of this graph?


Solution

  • This can be achieved using [Broadcast] in akka streams. It can be used to create multiple streams and use them independently.

    val source: Source[Int, NotUsed] =
      Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current().nextInt(100))).take(100)
    
    val countSink: Sink[Int, Future[Int]] = Sink.fold(0)((acc, elem) => acc + 1)
    val minSink: Sink[Int, Future[Int]] = Sink.fold(0)((acc, elem) => math.min(acc, elem))
    val maxSink: Sink[Int, Future[Int]] = Sink.fold(0)((acc, elem) => math.max(acc, elem))
    
    val (count: Future[Int], min: Future[Int], max: Future[Int]) =
      RunnableGraph
        .fromGraph(GraphDSL.createGraph(countSink, minSink, maxSink)(Tuple3.apply) {
          implicit builder => (countS, minS, maxS) =>
            import GraphDSL.Implicits._
            val broadcast = builder.add(Broadcast[Int](3))
            source ~> broadcast
            broadcast.out(0) ~> countS
            broadcast.out(1) ~> minS
            broadcast.out(2) ~> maxS
            ClosedShape
        })
        .run()
    

    REF: https://doc.akka.io/docs/akka/current/stream/operators/Broadcast.html#broadcast