javamultithreadingproducer-consumerdisruptor-patternlmax

LMAX Disruptor - what determines the batch size?


I have been recently learning about the LMAX Disruptor and been doing some experimentation. One thing that is puzzling me is the endOfBatch parameter of the onEvent handler method of the EventHandler. Consider my following code. First, the dummy message and consumer classes that I call Test1 and Test1Worker:

public class Test1 {

}

public class Test1Worker implements EventHandler<Test1>{
    public void onEvent(Test1 event, long sequence, boolean endOfBatch) {
        try{
            Thread.sleep(500);
        }
        catch(Exception e){
            e.printStackTrace();
        }
        System.out.println("Received message with sequence " + sequence + ". "
                + "EndOfBatch = " + endOfBatch);
    }
}

Notice that I have put a delay of 500 milliseconds just as a substitute for some real world work. I am also printing in the console the sequence number of the

And then my driver class (which is acting as the producer) called DisruptorTest:

public class DisruptorTest {

    private static Disruptor<Test1> bus1;

    private static ExecutorService test1Workers;

    public static void main(String[] args){             
        test1Workers = Executors.newFixedThreadPool(1);

        bus1 = new Disruptor<Test1>(new Test1Factory(), 8, test1Workers);           
        bus1.handleEventsWith(new Test1Worker());
        RingBuffer<Test1> buf1 = bus1.start();

        for (int i = 0; i < 10; i++){
            long a = System.currentTimeMillis();
            long next = buf1.next();
            long b = System.currentTimeMillis();
            System.out.println("Delay for claiming slot " + i + " is "+ (b - a));
            try {
                Test1 message = buf1.get(next);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                buf1.publish(next);
            }
        }
    }

    public static class Test1Factory implements EventFactory<Test1> {
        public Test1 newInstance() {
            return new Test1();
        }

    }   
}

Here, after initializing the required stuffs, I am feeding 10 messages to the RingBuffer (buffer size 8) and trying to monitor a couple of things - the delay for the producer for claiming the next slot in the RingBuffer and the messages with their sequence numbers on the consumer side, along with whether a particular sequence is being considered as end of batch.

Now, interestingly with the 500 ms delay involved for processing each message, this is what I get as output:

Delay for claiming slot 0 is 0
Delay for claiming slot 1 is 0
Delay for claiming slot 2 is 0
Delay for claiming slot 3 is 0
Delay for claiming slot 4 is 0
Delay for claiming slot 5 is 0
Delay for claiming slot 6 is 0
Delay for claiming slot 7 is 0
Received message with sequence 0. EndOfBatch = true
Delay for claiming slot 8 is 505
Received message with sequence 1. EndOfBatch = false
Received message with sequence 2. EndOfBatch = false
Received message with sequence 3. EndOfBatch = false
Received message with sequence 4. EndOfBatch = false
Received message with sequence 5. EndOfBatch = false
Received message with sequence 6. EndOfBatch = false
Received message with sequence 7. EndOfBatch = true
Delay for claiming slot 9 is 3519
Received message with sequence 8. EndOfBatch = true
Received message with sequence 9. EndOfBatch = true

However, if I remove the 500 ms wait time, this is what I get:

Delay for claiming slot 0 is 0
Delay for claiming slot 1 is 0
Received message with sequence 0. EndOfBatch = true
Received message with sequence 1. EndOfBatch = true
Delay for claiming slot 2 is 0
Received message with sequence 2. EndOfBatch = true
Delay for claiming slot 3 is 0
Received message with sequence 3. EndOfBatch = true
Delay for claiming slot 4 is 0
Received message with sequence 4. EndOfBatch = true
Delay for claiming slot 5 is 0
Received message with sequence 5. EndOfBatch = true
Delay for claiming slot 6 is 0
Received message with sequence 6. EndOfBatch = true
Delay for claiming slot 7 is 0
Received message with sequence 7. EndOfBatch = true
Delay for claiming slot 8 is 1
Delay for claiming slot 9 is 0
Received message with sequence 8. EndOfBatch = false
Received message with sequence 9. EndOfBatch = true  

So it looks like whether a certain message is considered to be at the end of a batch (i.e., the size of a batch) is being influenced by the consumer's message processing delay. May be I am being stupid here, but is that how it should be? What's the reasoning behind that? What determines the batch size in general anyway? Thanks in advance. Let me know if anything in my question is unclear.


Solution

  • Batch size is determined solely by the number of elements available. So if there are more elements available at the very moment, then it will be included in the batch. For example, if Disruptor calls your code and there is only one element in queue, then you will get one call with endOfBatch=true. If there are 8 elements in queue, then it will collect all 8 and send them in a single batch.

    You can see in below code that the # of entries "available" in queue are fetched and, which may be many more than the "next" item. So for example, you are currently 5, waiting for slot 6, and then 3 events arrive, available will be 8, and you will receive multiple calls (for 6,7,8) in a batch.

    https://github.com/LMAX-Exchange/disruptor/blob/master/src/main/java/com/lmax/disruptor/BatchEventProcessor.java#L124

    final long availableSequence = sequenceBarrier.waitFor(nextSequence);
    while (nextSequence <= availableSequence)
    {
        event = dataProvider.get(nextSequence);
        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
        nextSequence++;
    }
    

    With regards to the 500ms pause at element 9, please note that the Disruptor is built with a ring buffer, and you have specified number of slots in the buffer as 8 (see the second parameter here):

    bus1 = new Disruptor<Test1>(new Test1Factory(), 8, test1Workers);  
    

    If not all consumers have consumed an element, and the ringbuffer is at capacity (all 8 elements full), a producer will be blocked from posting new events to the buffer. You could try increasing the buffer size say 2million objects or make sure your consumer is faster than the producer so the queue doesn't fill up (remove the sleep, which you have demonstrated already).