hazelcast-jet

HazelcastJet kafka throttling


I couldn't find any possibilities to create pipeline in hazelcast-jet-kafka that will limit the throughput to a specific number of elements per time unit, anybody could suggest me possible solutions? I know that alpaka (https://doc.akka.io/docs/alpakka-kafka/current/) has such functionality


Solution

  • You can define this function:

    private <T, S extends GeneralStage<T>> FunctionEx<S, S> throttle(int itemsPerSecond) {
        // context for the mapUsingService stage
        class Service {
            final int ratePerSecond;
            final TreeMap<Long, Long> counts = new TreeMap<>();
    
            public Service(int ratePerSecond) {
                this.ratePerSecond = ratePerSecond;
            }
        }
    
        // factory for the service
        ServiceFactory<?, Service> serviceFactory = ServiceFactories
                .nonSharedService(procCtx ->
                        // divide the count for the actual number of processors we have
                        new Service(Math.max(1, itemsPerSecond / procCtx.totalParallelism())))
                // non-cooperative is needed because we sleep in the mapping function
                .toNonCooperative();
    
        return stage -> (S) stage
            .mapUsingService(serviceFactory,
                (ctx, item) -> {
                    // current time in 10ths of a second
                    long now = System.nanoTime() / 100_000_000;
                    // include this item in the counts
                    ctx.counts.merge(now, 1L, Long::sum);
                    // clear items emitted more than second ago
                    ctx.counts.headMap(now - 10, true).clear();
                    long countInLastSecond =
                            ctx.counts.values().stream().mapToLong(Long::longValue).sum();
                    // if we emitted too many items, sleep a while
                    if (countInLastSecond > ctx.ratePerSecond) {
                        Thread.sleep(
                            (countInLastSecond - ctx.ratePerSecond) * 1000/ctx.ratePerSecond);
                    }
                    // now we can pass the item on
                    return item;
                }
            );
    }
    

    Then use it to throttle in the pipeline:

    Pipeline p = Pipeline.create();
    p.readFrom(TestSources.items(IntStream.range(0, 2_000).boxed().toArray(Integer[]::new)))
     .apply(throttle(100))
     .writeTo(Sinks.noop());
    

    The above job will take about 20 seconds to complete because it has 2000 items and the rate is limited to 100 items/s. The rate is evaluated over the last second, so if there are less than 100 items/s, items will be forwarded immediately. If there are 101 items during one millisecond, 100 will be forwarded immediately and the next after a sleep.

    Also make sure that your source is distributed. The rate is divided by the number of processors in the cluster and if your source isn't distributed and some members don't see any data, your overall rate will be only a fraction of the desired rate.