javamultithreadingconcurrencythreadpool

Creating my own custom thread pool executor


I ran into an issue here while creating my own custom thread pool executor. I am trying to implement the same without making use of any executor service library in order to prepare for a java interview. I am able to code below service.

import java.util.LinkedList;
import java.util.Queue;

class Worker extends Thread {
    private final Queue<Runnable> taskQueue;
    private volatile boolean isStopped = false;

    public Worker(Queue<Runnable> taskQueue) {
        this.taskQueue = taskQueue;
    }

    public void run() {
        while (!isStopped) {
            Runnable task = null;
            synchronized (taskQueue) {
                while (taskQueue.isEmpty() && !isStopped) {
                    try {
                        taskQueue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                if (!taskQueue.isEmpty()) {
                    task = taskQueue.poll();
                }
            }
            if (task != null) {
                task.run();
            }
            if(isStopped)
            return;
        }
    }

    public void stopThread() {
        isStopped = true;
        interrupt(); // Interrupt the thread if it's waiting
    }
}

public class CustomThreadPool {
    private final int poolSize;
    private final Queue<Runnable> taskQueue;
    private final Worker[] workers;

    public CustomThreadPool(int poolSize) {
        this.poolSize = poolSize;
        taskQueue = new LinkedList<>();
        workers = new Worker[poolSize];
        for (int i = 0; i < poolSize; i++) {
            workers[i] = new Worker(taskQueue);
            workers[i].start();
        }
    }

    public void execute(Runnable task) {
        synchronized (taskQueue) {
            taskQueue.offer(task);
            taskQueue.notifyAll(); // Notify all waiting threads to start executing the task
        }
    }

    public void shutdown() {
        for (Worker worker : workers) {
            worker.stopThread();
        }
    }

    public static void main(String[] args) {
        CustomThreadPool threadPool = new CustomThreadPool(3);

        // Submit tasks to the thread pool
        for (int i = 1; i <= 5; i++) {
            int num = i;
            threadPool.execute(() -> {
                int result = calculateFactorial(num);
                System.out.println("Factorial of " + num + " is " + result + " - Thread: " + Thread.currentThread().getName());
            });
        }

        // Shutdown the thread pool
        threadPool.shutdown();
    }

    private static int calculateFactorial(int n) {
        int factorial = 1;
        for (int i = 1; i <= n; i++) {
            factorial *= i;
        }
        return factorial;
    }
}

Below is the output of the code above.

Factorial of 1 is 1 - Thread: Thread-0
Factorial of 2 is 2 - Thread: Thread-1
Factorial of 3 is 6 - Thread: Thread-2

Well the problem here is that I am submitting 5 tasks to the executor , i can see the same in the taskQueue as well, but only 3/5 tasks are being executed as the threadpool size is kept 3. I want these 3 threads to run those 5 tasks in someway. I need help to understand what i am doing incorrectly and how to force the same 3 threads to execute all the provisioned tasks in the queue.


Solution

  • Based on this answer, I would modify the run() method like this:

    public void run() 
    {
      while( true )
      {
        Runnable task = null;
        synchronized( taskQueue )
        {
          if( isStopped && taskQueue.isEmpty() ) break;
          if( taskQueue.isEmpty() )
          {
            try 
            {
              taskQueue.wait();
            } 
            catch( final InterruptedException e ) 
            {
              e.printStackTrace();
            }
          }
          if( !taskQueue.isEmpty() ) task = taskQueue.poll();
        }
        if( task != null ) task.run();
      }
    }