Consider the following code:
GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val in = Source(0 to 10)
val fanOut = builder.add(Broadcast[Int](2))
val toString = builder.add(Flow[Int].map(_.toString))
val squared = builder.add(Flow[Int].map(x => x * x))
val zip = builder.add(ZipLatestWith((str: String, sqr: Int) => (str, sqr)))
val out = Sink.ignore
in ~> fanOut ~> toString ~> zip ~> out
fanOut ~> squared ~> zip
ClosedShape
}
I get an error in zip ~> out
stating that "overloaded method ~> can't be applied to FanInShape2[String, Int, (String, Int)]". I can, of course, rewrite that graph composition in the following matter:
in ~> fanOut ~> toString ~> zip.in0; zip.out ~> out
fanOut ~> squared ~> zip.in1
But I have seen tutorials that show branching defined without the .in
and .out
specifics. Is there a limitation in the DSL regarding Zip stages, or am I doing something wrong?
It's a limitation of the DSL regarding zip stages relative to the merge/concat stages. Specifically, with the zip stages, because it's possible for the inlets to be different types, the approach taken with merge/concat where the first inlet to be attached is effectively in0
won't preserve type-safety (there's no way in Scala to express "the first usage of in
takes a Foo
and the second takes a Bar
). While it would be possible to define a Zip
operator that zipped two of the same type, there's also a case to be made that being explicit about which inlet maps to which half of the pair is valuable for preventing a reordering of the order in which the graph is constructed from affecting the meaning of the elements in the stream (e.g. if zipping (x, y)
coordinates, an unexpected change of order would be a diagonal reflection).