javamultithreadingrunnablescheduledexecutorservice

Make a ScheduledExecutorService single Thread create other independent and non-blocking Threads


I'm facing a problem with what the title suggests.

I have a @Component that monitors "Processes". Once the first Process is registered in the component, I initialize my ScheduledExecutorService:

ScheduledExecutorService pollingThread = Executors.newSingleThreadScheduledExecutor();

And with pollingThread.scheduleAtFixedRate(monitorProcesses, 60, 60, TimeUnit.SECONDS) I set up the Runnable "monitorProcesses" to be executed every 60 seconds, where I basically poll a third party API and loop each registered Process in my component to check whether they have finished or not.

If a process finishes, I want a new Thread to do some logic without blocking the "polling" Thread. Here is a code snippet.

/**
 * Starts the monitoring of the processes.
 */
private void startMonitoring() {

    pollingThread = Executors.newSingleThreadScheduledExecutor();

    Runnable monitorProcesses = () -> {

        // 1. Get processes queue
        Map<String, ProcessStatus> queue;
        do {
            try {
                queue = getProcessesStatusQueue(); // API request
            } catch (Exception e) {
                logger.warn("ProcessMonitor >> Unable to get processes queue. Trying again in the next cycle.");
                return;
            }
        } while (queue == null);

        // 2. Loop registered processes and check for changes in their status
        for (Process process : getRegisteredProcesses()) {
            try {
                ProcessStatus processStatus = queue.get(process.getId());
                syncProcess(process, processStatus);
            } catch (Exception e) {
                onUnexpectedError(process, e);
            }
        }

        // 3. Check if all processes finished
        if (getRegisteredProcesses().isEmpty()) { // synchronized method
            stopMonitoring(); // executes pollingThread.shutDown() and some more stuff
        }
    };

    // Start polling
    pollingThread.scheduleAtFixedRate(monitorProcesses, 60, 60, TimeUnit.SECONDS);

    isMonitoringActive = true;
}

private void syncProcess(
        @NotNull Process process, ProcessStatus processStatus) {

    //
    // some logic
    //

    boolean condition = checkProcessEnded(process, processStatus);
    if (condition) {
        new Thread(() -> processService.onProcessesEnded(process)).start();
    }

}

The problem I'm facing is when a first Process process finishes, and therefore a new Thread is created to execute processService.onProcessesEnded(process), right afterwards my main loop 2. doesn't continue and the "polling thread" stops. So for example, if I have 10 processes, and the first one ends, the 9 remaining processes are never "processed" (forgive the redundancy).

I don't see any exceptions being thrown in my logs.

Any help would be highly appreciated!


Solution

  • So, looks like I've solved my concurrency issue. It wasn't about the Thread creation. The problem occurred when processes were registered while the polling thread was iterating over the HashMap where I was storing my registered processes.

    You either have to use a ConcurrentHashMap, which is a Map adapted for concurrency and can handle insertions while iterating over it, or synchronize the loop block and the methods where you register the elements of the map, so no elements are inserted in the HashMap while it's being iterated over:

    Synchronize example with HashMap

    Map<String, Process> registeredProcesses = new HashMap<>();
    
    private void startMonitoring() {
    
        ...
        ...
    
        // Lock map
        synchronize (registeredProcesses) {
            // 2. Loop registered processes and check for changes in their status
            for (Process process : registeredProcesses.values()) {
                try {
                    ProcessStatus processStatus = queue.get(process.getId());
                    syncProcess(process, processStatus);
                } catch (Exception e) {
                    onUnexpectedError(process, e);
                }
            }
        }
    
        ...
        ...
    
    }
    
    private void register(Process process) {
        synchronize (registeredProcesses) {
            registeredProcesses.put(process.getId(), process);
        }
    }
    

    I've chosen the ConcurrentHashMap option, because I don't want the threads that register processes to block.