scalaakkaakka-streamakka-httpmonix

akka.streams.Source that you can emit values (similar to monix.BehaviorSubject)


I'm looking for akka.stream.scaladsl.Source construction method that will allow me to simply emit next value from different place of code (e.g. watching on system events).

currently I've implemented this using monix & akka-streams (code below) but I expect that there should be akka-streams only sollution:

import akka.stream.scaladsl.{Flow, Sink, Source}
import monix.reactive.subjects.BehaviorSubject
import monix.execution.Scheduler.Implicits.global

val bs = BehaviorSubject("") //monix subject is sink and source at the same time

//this is how it is currently implemented
def createSource() = { 
    val s1 = Source.fromPublisher(bs.toReactivePublisher) //here we create source from subject
    Flow.fromSinkAndSourceCoupled[String, String](Sink.ignore, s1)
}

//somewhere else in code... some event happened
//this is how it works in monix.
val res:Future[Ack] = bs.onNext("Event #1471 just happened!") //here we emit value

Solution

  • Perhaps you are looking for Actor Source

    An example from the docs:

    import akka.actor.typed.ActorRef
    import akka.stream.OverflowStrategy
    import akka.stream.scaladsl.{ Sink, Source }
    import akka.stream.typed.scaladsl.ActorSource
    
    trait Protocol
    case class Message(msg: String) extends Protocol
    case object Complete extends Protocol
    case class Fail(ex: Exception) extends Protocol
    
    val source: Source[Protocol, ActorRef[Protocol]] = ActorSource.actorRef[Protocol](completionMatcher = {
      case Complete =>
    }, failureMatcher = {
      case Fail(ex) => ex
    }, bufferSize = 8, overflowStrategy = OverflowStrategy.fail)
    
    val ref = source
      .collect {
        case Message(msg) => msg
      }
      .to(Sink.foreach(println))
      .run()
    
    ref ! Message("msg1")
    

    This way you would be able to send messages to actor via actor system, and these messages will be emitted from the ActorSource down the stream.