I'm looking for akka.stream.scaladsl.Source
construction method that will allow me to simply emit next value from different place of code (e.g. watching on system events).
Promise
. Promise emits single value to Future
. I need to emit multiple values to Source
.monix.reactive.subjects.BehaviorSubject.onNext(_)
currently I've implemented this using monix & akka-streams (code below) but I expect that there should be akka-streams only sollution:
import akka.stream.scaladsl.{Flow, Sink, Source}
import monix.reactive.subjects.BehaviorSubject
import monix.execution.Scheduler.Implicits.global
val bs = BehaviorSubject("") //monix subject is sink and source at the same time
//this is how it is currently implemented
def createSource() = {
val s1 = Source.fromPublisher(bs.toReactivePublisher) //here we create source from subject
Flow.fromSinkAndSourceCoupled[String, String](Sink.ignore, s1)
}
//somewhere else in code... some event happened
//this is how it works in monix.
val res:Future[Ack] = bs.onNext("Event #1471 just happened!") //here we emit value
Perhaps you are looking for Actor Source
An example from the docs:
import akka.actor.typed.ActorRef
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.typed.scaladsl.ActorSource
trait Protocol
case class Message(msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Exception) extends Protocol
val source: Source[Protocol, ActorRef[Protocol]] = ActorSource.actorRef[Protocol](completionMatcher = {
case Complete =>
}, failureMatcher = {
case Fail(ex) => ex
}, bufferSize = 8, overflowStrategy = OverflowStrategy.fail)
val ref = source
.collect {
case Message(msg) => msg
}
.to(Sink.foreach(println))
.run()
ref ! Message("msg1")
This way you would be able to send messages to actor via actor system, and these messages will be emitted from the ActorSource
down the stream.