filter
(which uses halt
inside) terminates other branch even if it has some side-effects:
scala> val p = Process("1","2", "3")
scala> val p1 = p.filter(_ => true).map(_ + "p1").observe(io.stdOutLines)
scala> val p2 = p.filter(_ => false).map(_ + "p2").observe(io.stdOutLines)
scala> (p1 yip p2).run.run
1p1
scala> val p2 = p.filter(_ => true).map(_ + "p1").observe(io.stdOutLines)
scala> (p1 yip p2).run.run
1p1
1p2
2p1
2p2
3p1
3p2
Seems logical as there is no value to be returned to yip
after that filter
. But what about side-effects, specified with observe
?
My current solution is to use flatMap
to specify default value:
scala> val p1 = p.map(_ + "p1").flatMap(x => Process.emit(x).observe(io.stdOutLines))
scala> val p2 = p.map(_ + "p2").flatMap(x => Process.emit(""))
scala> (p1 yip p2).run.run
1p1
2p1
3p1
But maybe there is a way to use filter
?
P.S. merge
combinator executes side-effects for other branch (as it doesn't require value to be returned), but it doesn't wait for other branch if one halts (even if it has side-effects).
Actually it should be just something like that:
in.map(emit).flatMap{ p =>
val p1 = p.map(_ + "p1").filter(_ => true).observe(out)
val p2 = p.map(_ + "p2").filter(_ => false).observe(out)
p1 merge p2
}.run.run
It makes all side effects being in order as filter
can't get more than one value (produced by emit)