javamultithreadingthreadpoolexecutor

How to implement blocking thread pool executor?


We have a large text file in which each line requires intensive process. The design is to have a class that reads the file and delegates the processing of each line to a thread, via thread pool. The file reader class should be blocked from reading the next line once there is no free thread in the pool to do the processing. So i need a blocking thread pool

In the current implementation ThreadPoolExecutor.submit() and ThreadPoolExecutor.execute() methods throw RejectedExecutionException exception after the configured # of threads get busy as i showed in code snippet below.

public class BlockingTp {

    public static void main(String[] args) {
        BlockingQueue blockingQueue = new ArrayBlockingQueue(3);
        ThreadPoolExecutor executorService=
            new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS, blockingQueue);
        int Jobs = 10;
        System.out.println("Starting application with " + Jobs + " jobs");
        for (int i = 1; i <= Jobs; i++)
            try {
                executorService.submit(new WorkerThread(i));
                System.out.println("job added " + (i));
            } catch (RejectedExecutionException e) {
                System.err.println("RejectedExecutionException");
            }
    }
}

class WorkerThread implements Runnable {
    int job;
    public WorkerThread(int job) {
        this.job = job;
    }
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (Exception excep) {
        }
    }
}

Output of above program is

Starting application to add 10 jobs
Added job #1
Added job #2
Added job #3
Added job #4
Added job #5
Added job #6
RejectedExecutionException
RejectedExecutionException
RejectedExecutionException
RejectedExecutionException

Can some one throw some light i.e how i can implement blocking thread pool.


Solution

  • Can some one throw some light i.e how i can implement blocking thread pool.

    You need to set a rejection execution handler on your executor service. When the thread goes to put the job into the executor, it will block until there is space in the blocking queue.

    BlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3);
    ThreadPoolExecutor executorService =
         new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS, arrayBlockingQueue);
    // when the queue is full, this tries to put into the queue which blocks
    executorService.setRejectedExecutionHandler(new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                // block until there's room
                executor.getQueue().put(r);
                // check afterwards and throw if pool shutdown
                if (executor.isShutdown()) {
                    throw new RejectedExecutionException(
                        "Task " + r + " rejected from " + executor);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RejectedExecutionException("Producer interrupted", e);
            }
        }
    });
    

    So instead of the TRE throwing a RejectedExecutionException, it will call the rejection handler which will in turn try to put the job back on the queue. This blocks the caller.