scalaakkaakka-streamreactive-streams

Akka Source that emits when another Sink receives


I have a source a that emits values into a sink b.

Now I want to have another source c that emits a value, everytime b receives an event.

My idea was to use another sink d that can be used as a notifier, but then I need the functionality to create a Source from a Sink.

a.to(b).alsoTo(d)

something like

Source.from(d)


Solution

  • Another way of describing this is that you want every event emitted by a to go to both b and c. This is what a BroadcastHub does; it can be used to allow events from one Source to be consumed by multiple Sinks.

    If you connect a Source to a BroadcastHub.sink and then materialise it, you get a new Source. This Source can then be attached to 2 or more Sinks and each Sink will get a copy of the message sent by the original Source.

    For example I use this with Akka to have a Actor that broadcasts messages to multiple clients (for gRPC events):

    val (actorRef: ActorRef[Event], eventSource: Source[Event, akka.NotUsed]) =
      ActorSource.actorRef[Event](
        completionMatcher = PartialFunction.empty,
        failureMatcher = PartialFunction.empty,
        16,
        OverflowStrategy.fail
      )
        .toMat(BroadcastHub.sink)(Keep.both)
        .run()
    

    This creates eventSource which can be used in a pipeline and materialised multiple times to create multiple streams. Each time a message is sent to the actorRef, every stream that was materialised from eventSource receives that message.

    See the documentation for more details.