I was working with the workerpool example and was trying different WaitStrategies. When i try the TimeoutBlockingWaitStrategy, i am getting error. Here's the program and the call stack.
package org.lmax.experiment.test;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
@SuppressWarnings("unused")
public class MultipleWorkerPoolsTest {
private static final Logger log = LoggerFactory
.getLogger(MultipleWorkerPoolsTest.class);
private static class StringEvent{
private String value;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public String toString(){
return value;
}
}
private static class Worker implements WorkHandler<StringEvent>{
private static final Logger log = LoggerFactory
.getLogger(MultipleWorkerPoolsTest.Worker.class);
private String workerId;
public Worker(String workerId) {
super();
this.workerId = workerId;
}
Random r=new Random();
@Override
public void onEvent(StringEvent event) throws Exception {
System.out.println( "{" + workerId+ "} got {" + event + "}");
log.info("{} got {}",workerId,event);
Integer timeToSleep = r.nextInt(10000);
Thread.sleep(timeToSleep);
System.out.println( "{" + workerId+ "} Completed {" + event + "}");
}
}
private static class StringEventFactory implements EventFactory<StringEvent>{
@Override
public StringEvent newInstance() {
return new StringEvent();
}
}
private static class StringEventTranslator implements EventTranslatorOneArg<StringEvent,String>{
@Override
public void translateTo(StringEvent event, long sequence, String arg0) {
event.setValue("event "+sequence+": "+arg0);
}
}
private Disruptor<StringEvent> disruptor;
private ExecutorService executor;
@Before
public void before(){
executor=Executors.newFixedThreadPool(12, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t=new Thread(r);
t.setDaemon(true);
String threadName = "worker_1_" + t.getId() ;
t.setName(threadName);
return t;
}
});
disruptor=new Disruptor<StringEvent>(new StringEventFactory(),64,executor, ProducerType.MULTI,new TimeoutBlockingWaitStrategy(1000, TimeUnit.NANOSECONDS));
Worker w11=new Worker("worker_1-1");
Worker w12=new Worker("worker_1-2");
Worker w13=new Worker("worker_1-3");
Worker w14=new Worker("worker_1-4");
Worker w21=new Worker("worker_2-1");
Worker w22=new Worker("worker_2-2");
Worker w23=new Worker("worker_2-3");
Worker w24=new Worker("worker_2-4");
Worker w31=new Worker("worker_3-1");
Worker w32=new Worker("worker_3-2");
Worker w33=new Worker("worker_3-3");
Worker w34=new Worker("worker_3-4");
Worker[] workerArray = new Worker[12];
/* workerArray[0] = w11;
workerArray[1] = w12;
workerArray[2] = w13;
workerArray[3] = w14;*/
for ( int i = 0; i < 12; i++ ){
String name = "worker_" + Integer.toString(i);
workerArray[i] = new Worker(name);
}
disruptor.handleEventsWithWorkerPool(workerArray);
// .thenHandleEventsWithWorkerPool(w21,w22,w23,w24)
// .thenHandleEventsWithWorkerPool(w31,w32,w33,w34);
//disruptor.handleEventsWithWorkerPool(w11);
}
@After
public void after() throws InterruptedException{
executor.shutdown();
executor.awaitTermination(0, TimeUnit.MILLISECONDS);
}
@Test
public void test1() throws InterruptedException, ExecutionException{
StringEventTranslator t=new StringEventTranslator();
ExecutorService executorService = Executors.newFixedThreadPool(3);
disruptor.start();
RingBuffer<StringEvent> ringBuffer = disruptor.getRingBuffer();
Future future = executorService.submit(new Runnable() {
public void run() {
StringEventTranslator t=new StringEventTranslator();
RingBuffer<StringEvent> ringBuffer = disruptor.getRingBuffer();
String threadName = Thread.currentThread().getName();
String threadId = Long.toString(Thread.currentThread().getId());
String eventName = "hello" + threadName + "." + threadId;
for(int i=0;i<10000;i++){
ringBuffer.publishEvent(t, eventName);
//disruptor.publishEvent(t,eventName);
}
}
});
Future future1 = executorService.submit(new Runnable() {
public void run() {
StringEventTranslator t=new StringEventTranslator();
RingBuffer<StringEvent> ringBuffer = disruptor.getRingBuffer();
String threadName = Thread.currentThread().getName();
String threadId = Long.toString(Thread.currentThread().getId());
String eventName = "hello" + threadName + "." + threadId;
for(int i=0;i<10000;i++){
ringBuffer.publishEvent(t, eventName);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
future1.get();
future.get();
/* for(int i=0;i<100;i++){
disruptor.publishEvent(t,"hello");
}*/
System.out.println("Disruptor shutting down");
log.info("Disruptor shutting down");
Thread.sleep(1000000);
disruptor.shutdown();
System.out.println("Disruptor shut down complete");
log.info("Disruptor shutdown");
}
}
The error is following
Exception in thread "worker_1_9" Exception in thread "worker_1_12" Exception in thread "worker_1_16" Exception in thread "worker_1_20" Exception in thread "worker_1_17" Exception in thread "worker_1_14" java.lang.NullPointerException
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "worker_1_10" Exception in thread "worker_1_19" Exception in thread "worker_1_11" Exception in thread "worker_1_18" java.lang.NullPointerException
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "worker_1_13" java.lang.NullPointerException
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "worker_1_15" java.lang.NullPointerException
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:156)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Wondering if there is a bug in my code. Any help is really appreciated. Also if someone could explain the purpose of TimeoutBlockingWaitStrategy in disruptor, it would be great.
This is bug in 3.3.4 and earlier versions of the Disruptor and will be fixed in 3.3.5.