The minimal working example below fast produces events which then update an IMap. The IMap in turn produces update events from its journal.
public class FastIMapExample {
private static final int NUMBER_OF_GROUPS = 10;
private static final int NUMBER_OF_EVENTS = 1000;
public static void main(String[] args) {
JetInstance jet = Jet.newJetInstance();
IMap<Long, Long> groups = jet.getMap("groups");
Pipeline p1 = Pipeline.create();
p1.readFrom(fastStreamOfLongs(NUMBER_OF_EVENTS))
.withoutTimestamps()
.writeTo(Sinks.mapWithUpdating(groups,
event -> event % NUMBER_OF_GROUPS,
(oldState, event) -> increment(oldState)
));
Pipeline p2 = Pipeline.create();
p2.readFrom(Sources.mapJournal(groups, START_FROM_OLDEST))
.withIngestionTimestamps()
.map(x -> x.getKey() + " -> " + x.getValue())
.writeTo(Sinks.logger());
jet.newJob(p2);
jet.newJob(p1).join();
}
private static StreamSource<Long> fastStreamOfLongs(int numberOfEvents) {
return SourceBuilder
.stream("fast-longs", ctx -> new AtomicLong(0))
.<Long>fillBufferFn((num, buf) -> {
long val = num.getAndIncrement();
if (val < numberOfEvents) buf.add(val);
})
.build();
}
private static long increment(Long x) {
return x == null ? 1 : x + 1;
}
}
Example output:
3 -> 7
3 -> 50
3 -> 79
7 -> 42
...
6 -> 100
0 -> 82
9 -> 41
9 -> 100
I was expecting to see precisely 1000 events describing each update. Instead I see about 50-80 events. (It seems that the output contains all the latest updates (i.e. "-> 100"
) from each group, but otherwise it is a random subset.)
When NUMBER_OF_GROUPS
equals NUMBER_OF_EVENTS
(or when event generation is artificially slowed down) I receive all 1000 updates.
Is this behaviour expected? Is it possible to receive all update events from the fast source?
Sinks.mapWithUpdating
uses batching, so some of the updates are applied locally before sending the actual update entry processor. You need to use Sinks.mapWithEntryProcessor
to send an update entry processor for each and every item.
From the JavaDoc of Sinks.mapWithEntryProcessor
:
* As opposed to {@link #mapWithUpdating} and {@link #mapWithMerging},
* this sink does not use batching and submits a separate entry processor
* for each received item. For use cases that are efficiently solvable
* using those sinks, this one will perform worse. It should be used only
* when they are not applicable.
Keep in mind that the default capacity for event journal is 10K, if you are using default partition count, this makes 36 per partition which is not enough to store all the updates at once. For your case, if you are using the default partition count, you need to set the capacity to 271K or higher to store all the updates.