We're using micronaut/kafka-streams. Using this framework in order to create a streams application you build something like this:
@Factory
public class FooTopologyConfig {
@Singleton
@Named
public KStream<String, FooPojo> configureTopology {
return builder.stream("foo-topic-in")
.peek((k,v) -> System.out.println(String.format("key %s, value: %s", k,v))
.to("foo-topic-out");
}
}
This:
ConfiguredStreamBuilder
(a very light wrapper around StreamsBuilder
)ConfiguredStreamBuilder::build()
(which invokes the same on StreamsBuilder
) is called later by the framework and the returned Topology
is not made available for injection by Micronaut.
We want the Topology
bean in order to log a description of the topology (via Topology::describe
).
Is it safe to do the following?
ConfiguredStreamBuilder::build
(and therefore StreamsBuilder::build
) and use the returned instance of Topology
to print a human readable description.ConfiguredStreamBuilder::build
for a second time later, and use the second instance of the returned topology to build the application.There should be no problem calling build()
multiple times. This is common in the internal code of Streams as well as in the tests.
To answer your other question. you only need the stream from builder.stream()
operations if you want to expand on that branch of the topology later.