Given a simple Apache Storm Topology that makes use of the Stream API, there are two ways of initializing an Stream:
Version 1 - implicit declaration
StreamBuilder builder = new StreamBuilder();
builder
.newStream(new IntSpout(), new ValueMapper<Integer>(0), 1)
.filter(x -> x > 5)
.print();
Result: This worked as expected, it only prints integers > 5.
Version 2 - explicit declaration
Stream<Integer> integerStream = builder.newStream(new IntSpout(), new ValueMapper<Integer>(0), 1);
integerStream.filter(x -> x > 5);
integerStream.print();
Result: This did not work - all tuples were printed, including integers < 5.
Question: Why does this explicit declaration not work properly and how to fix that?
The topologies were ran on an local cluster where IntSpout
is just an simple spout that emits random integers with the following commands:
StormTopology topo = builder.build();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", new HashMap<>(), topo);
That's because integerStream.filter(x -> x > 5);
returns a new stream that you ignore.
This works:
Stream<Integer> integerStream = builder.newStream(new IntSpout(), new ValueMapper<Integer>(0), 1);
Stream<Integer> filteredStream = integerStream.filter(x -> x > 5);
filteredStream.print();
There is also a syntax error in your first example. It has an extra semicolon at the end of the fourth line.
StreamBuilder builder = new StreamBuilder();
builder
.newStream(new IntSpout(), new ValueMapper<Integer>(0), 1)
.filter(x -> x > 5) // <= there was a semicolon here
.print();