disruptor-patternlmax

Null Pointer exception while using TimeoutBlockingWaitStrategy in disruptor


I was working with the workerpool example and was trying different WaitStrategies. When i try the TimeoutBlockingWaitStrategy, i am getting error. Here's the program and the call stack.

package org.lmax.experiment.test;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

@SuppressWarnings("unused")
public class MultipleWorkerPoolsTest {

private static final Logger log = LoggerFactory
        .getLogger(MultipleWorkerPoolsTest.class);
private static class StringEvent{
    private String value;

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }
    public String toString(){
        return value;
    }
}

private static class Worker implements WorkHandler<StringEvent>{


    private static final Logger log = LoggerFactory
            .getLogger(MultipleWorkerPoolsTest.Worker.class);
    private String workerId;


    public Worker(String workerId) {
        super();
        this.workerId = workerId;
    }


    Random r=new Random();
    @Override
    public void onEvent(StringEvent event) throws Exception {
        System.out.println( "{" + workerId+ "} got {" + event + "}");
        log.info("{} got {}",workerId,event);
        Integer timeToSleep = r.nextInt(10000);
        Thread.sleep(timeToSleep);
        System.out.println( "{" + workerId+ "} Completed {" + event + "}");

    }

}

private static class StringEventFactory implements EventFactory<StringEvent>{

    @Override
    public StringEvent newInstance() {
        return new StringEvent();
    }

}

private static class StringEventTranslator implements      EventTranslatorOneArg<StringEvent,String>{

    @Override
    public void translateTo(StringEvent event, long sequence, String arg0) {
        event.setValue("event "+sequence+": "+arg0);
    }

}

private Disruptor<StringEvent> disruptor;
private ExecutorService executor;
@Before
public void before(){
    executor=Executors.newFixedThreadPool(12, new ThreadFactory() {

        @Override
        public Thread newThread(Runnable r) {
            Thread t=new Thread(r);
            t.setDaemon(true);
            String threadName = "worker_1_" + t.getId() ;
            t.setName(threadName);
            return t;
        }
    });
    disruptor=new Disruptor<StringEvent>(new StringEventFactory(),64,executor, ProducerType.MULTI,new TimeoutBlockingWaitStrategy(1000, TimeUnit.NANOSECONDS));

    Worker w11=new Worker("worker_1-1");
    Worker w12=new Worker("worker_1-2");
    Worker w13=new Worker("worker_1-3");
    Worker w14=new Worker("worker_1-4");
    Worker w21=new Worker("worker_2-1");
    Worker w22=new Worker("worker_2-2");
    Worker w23=new Worker("worker_2-3");
    Worker w24=new Worker("worker_2-4");
    Worker w31=new Worker("worker_3-1");
    Worker w32=new Worker("worker_3-2");
    Worker w33=new Worker("worker_3-3");
    Worker w34=new Worker("worker_3-4");

    Worker[] workerArray = new Worker[12];
/*  workerArray[0] = w11;
    workerArray[1] = w12;
    workerArray[2] = w13;
    workerArray[3] = w14;*/

    for ( int i  = 0; i < 12; i++ ){
        String name = "worker_" + Integer.toString(i);
        workerArray[i] = new Worker(name);
    }


    disruptor.handleEventsWithWorkerPool(workerArray);
// .thenHandleEventsWithWorkerPool(w21,w22,w23,w24)
// .thenHandleEventsWithWorkerPool(w31,w32,w33,w34);

    //disruptor.handleEventsWithWorkerPool(w11);

}

@After
public void after() throws InterruptedException{
    executor.shutdown();
    executor.awaitTermination(0, TimeUnit.MILLISECONDS);
}
@Test
public void test1() throws InterruptedException, ExecutionException{
    StringEventTranslator t=new StringEventTranslator();

    ExecutorService executorService = Executors.newFixedThreadPool(3);


    disruptor.start();
    RingBuffer<StringEvent> ringBuffer = disruptor.getRingBuffer();

    Future future = executorService.submit(new Runnable() {
        public void run() {
            StringEventTranslator t=new StringEventTranslator();
            RingBuffer<StringEvent> ringBuffer = disruptor.getRingBuffer();


            String threadName = Thread.currentThread().getName();
            String threadId = Long.toString(Thread.currentThread().getId());
            String eventName = "hello" + threadName + "." + threadId;
            for(int i=0;i<10000;i++){
                ringBuffer.publishEvent(t, eventName);
                //disruptor.publishEvent(t,eventName);
            }
        }
    });

    Future future1 = executorService.submit(new Runnable() {
        public void run() {
            StringEventTranslator t=new StringEventTranslator();
            RingBuffer<StringEvent> ringBuffer = disruptor.getRingBuffer();
            String threadName = Thread.currentThread().getName();
            String threadId = Long.toString(Thread.currentThread().getId());
            String eventName = "hello" + threadName + "." + threadId;
            for(int i=0;i<10000;i++){
                ringBuffer.publishEvent(t, eventName);
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    });
    future1.get();
    future.get();
    /* for(int i=0;i<100;i++){
        disruptor.publishEvent(t,"hello");
    }*/


    System.out.println("Disruptor shutting down");
    log.info("Disruptor shutting down");
    Thread.sleep(1000000);
    disruptor.shutdown();
    System.out.println("Disruptor shut down complete");

    log.info("Disruptor shutdown");
    }


}

The error is following

  Exception in thread "worker_1_9" Exception in thread "worker_1_12" Exception in thread "worker_1_16" Exception in thread "worker_1_20" Exception in thread "worker_1_17" Exception in thread "worker_1_14" java.lang.NullPointerException
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Exception in thread "worker_1_10" Exception in thread "worker_1_19" Exception in thread "worker_1_11" Exception in thread "worker_1_18" java.lang.NullPointerException
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Exception in thread "worker_1_13" java.lang.NullPointerException
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Exception in thread "worker_1_15" java.lang.NullPointerException
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Wondering if there is a bug in my code. Any help is really appreciated. Also if someone could explain the purpose of TimeoutBlockingWaitStrategy in disruptor, it would be great.


Solution

  • This is bug in 3.3.4 and earlier versions of the Disruptor and will be fixed in 3.3.5.