I couldn't find any possibilities to create pipeline in hazelcast-jet-kafka that will limit the throughput to a specific number of elements per time unit, anybody could suggest me possible solutions? I know that alpaka (https://doc.akka.io/docs/alpakka-kafka/current/) has such functionality
You can define this function:
private <T, S extends GeneralStage<T>> FunctionEx<S, S> throttle(int itemsPerSecond) {
// context for the mapUsingService stage
class Service {
final int ratePerSecond;
final TreeMap<Long, Long> counts = new TreeMap<>();
public Service(int ratePerSecond) {
this.ratePerSecond = ratePerSecond;
}
}
// factory for the service
ServiceFactory<?, Service> serviceFactory = ServiceFactories
.nonSharedService(procCtx ->
// divide the count for the actual number of processors we have
new Service(Math.max(1, itemsPerSecond / procCtx.totalParallelism())))
// non-cooperative is needed because we sleep in the mapping function
.toNonCooperative();
return stage -> (S) stage
.mapUsingService(serviceFactory,
(ctx, item) -> {
// current time in 10ths of a second
long now = System.nanoTime() / 100_000_000;
// include this item in the counts
ctx.counts.merge(now, 1L, Long::sum);
// clear items emitted more than second ago
ctx.counts.headMap(now - 10, true).clear();
long countInLastSecond =
ctx.counts.values().stream().mapToLong(Long::longValue).sum();
// if we emitted too many items, sleep a while
if (countInLastSecond > ctx.ratePerSecond) {
Thread.sleep(
(countInLastSecond - ctx.ratePerSecond) * 1000/ctx.ratePerSecond);
}
// now we can pass the item on
return item;
}
);
}
Then use it to throttle in the pipeline:
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(IntStream.range(0, 2_000).boxed().toArray(Integer[]::new)))
.apply(throttle(100))
.writeTo(Sinks.noop());
The above job will take about 20 seconds to complete because it has 2000 items and the rate is limited to 100 items/s. The rate is evaluated over the last second, so if there are less than 100 items/s, items will be forwarded immediately. If there are 101 items during one millisecond, 100 will be forwarded immediately and the next after a sleep.
Also make sure that your source is distributed. The rate is divided by the number of processors in the cluster and if your source isn't distributed and some members don't see any data, your overall rate will be only a fraction of the desired rate.