javadisruptor-patternlmax

Using lmax Disruptor (3.0) in java to process millions of documents


I have the following use-case:

When my service starts, it may need to deal with millions of documents in as short of a burst as possible. There will be three sources of data.

I have set up the following:

    /* batchSize = 100, bufferSize = 2^30
    public MyDisruptor(@NonNull final MyDisruptorConfig config) {
        batchSize = config.getBatchSize();
        bufferSize = config.getBufferSize();
        this.eventHandler = config.getEventHandler();
        ThreadFactory threadFactory = createThreadFactory("disruptor-threads-%d");
        executorService = Executors.newSingleThreadExecutor(threadFactory);
        ringBuffer = RingBuffer.createMultiProducer(new EventFactory(), bufferSize, new YieldingWaitStrategy());
        sequenceBarrier = ringBuffer.newBarrier();
        batchEventProcessor = new BatchEventProcessor<>(ringBuffer, sequenceBarrier, eventHandler);
        ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
        executorService.submit(batchEventProcessor);
    }

    public void consume(@NonNull final List<Document> documents) {
        List<List<Document>> subLists = Lists.partition(documents, batchSize);
        for (List<Document> subList : subLists) {
            log.info("publishing sublist of size {}", subList.size());
            long high = ringBuffer.next(subList.size());
            long low = high - (subList.size() - 1);
            long position = low;
            for (Document document: subList) {
                ringBuffer.get(position++).setEvent(document);
            }
            ringBuffer.publish(low, high);
            lastPublishedSequence.set(high);
        }
    }

Each of my sources calls consume, I use Guice to create a Singleton disruptor.

My eventHandler routine is

    public void onEvent(Event event, long sequence, boolean endOfBatch) throws Exception {
        Document document = event.getValue();
        handler.processDocument(document); //send the document to handler
        if (endOfBatch) {
            handler.processDocumentsList(); // tell handler to process all documents so far.
        }
    }

I am seeing in my logs that the producer (consume) is stalling at times. I assume that this is when the ringBuffer is full, and the eventHandler is not able to process quickly enough. I see that the eventHandler is processing documents (from my logs) and then after a while the producer starts publishing more documents to the ring buffer.

Questions:


Solution

  • Is your handler stateful? If not, you can use multiple parallel event handlers to process the documents. You could implement a basic sharding strategy where only one of the handlers processes each event.

    endOfBatch is usually used to speed up the speed of processing by optimising IO operations that benefit from batching. E.g. writing to file on each event but only flushing on endOfBatch.

    It's hard to give any more advice without know what happens in your document processor.