I am trying to put incoming Source[ByteString, Any]
into 2 sinks
& trying to duplicate the incoming stream in akka streaming graphs
. I am getting desired 'is'
as input stream but bs
in not of type Source[ByteString, Any]
. I am getting boxed error
.
private def duplicateStream(content: Source[ByteString, Any]): Future[Either[X, Y]] = {
val sink = StreamConverters.asInputStream()
val (is, bs): (InputStream, Future[ByteString]) = content
.alsoToMat(sink)(Keep.right)
.toMat(Sink.last)(Keep.both)
.run()
//is is input stream which is desired
//bs should be Source[ByteString, Any]
}
How can I get bs
asSource[ByteString, Any]
out of this graph?
This can be achieved using [Broadcast]
in akka streams
. It can be used to create multiple streams and use them independently.
val source: Source[Int, NotUsed] =
Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current().nextInt(100))).take(100)
val countSink: Sink[Int, Future[Int]] = Sink.fold(0)((acc, elem) => acc + 1)
val minSink: Sink[Int, Future[Int]] = Sink.fold(0)((acc, elem) => math.min(acc, elem))
val maxSink: Sink[Int, Future[Int]] = Sink.fold(0)((acc, elem) => math.max(acc, elem))
val (count: Future[Int], min: Future[Int], max: Future[Int]) =
RunnableGraph
.fromGraph(GraphDSL.createGraph(countSink, minSink, maxSink)(Tuple3.apply) {
implicit builder => (countS, minS, maxS) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](3))
source ~> broadcast
broadcast.out(0) ~> countS
broadcast.out(1) ~> minS
broadcast.out(2) ~> maxS
ClosedShape
})
.run()
REF: https://doc.akka.io/docs/akka/current/stream/operators/Broadcast.html#broadcast