javaout-of-memoryexecutorservicerunnable

how to prevent Out Of Memory because of many Runnables


I need to run 100 tasks. I want to run them in parallel, but only 10 simultaneously. I can use for it simple

ExecutorService service = Executors.newFixedThreadPool(10);

then create 10 Runnables and sumbmit them to ExecutorService.

But what if there are 100_000 tasks or 100_000_000 tasks? At some point It would cause Out Of Memory because of so many Runnables in Memory.

So I wanted something like that: I store parameters of all task in database, then take another one of them from database when one thread became free and start new task with this parameters. I come up with something like this

package com.company;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

    private static ExecutorService executorService = Executors.newFixedThreadPool(10);

    private static class MyRunnable implements Runnable {

        private int i;  /* there would by actually parameters for task*/

        public MyRunnable(int i) {
            this.i = i;
        }

        public void run() {
            try {
                System.out.println(i); // there would by actually some code
                Thread.sleep(1_000);   // there would by actually some code
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                executorService.execute(new MyRunnable(i + 10 /* there would by actually taking parameters from database for new task*/));
            }
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            executorService.submit(new MyRunnable(i /* there would by actually taking parameters from database for new task*/));
        }
    }
}

Is it good or bad solution? What is the best solution?


Solution

  • You seem to be concerned that too many pending Runnable task objects could take more memory than you will have available on your deployment machine. You want to avoid out-of-memory errors or virtual memory thrashing.

    To avoid too many tasks, wait to instantiate the next task until the current one is done.

    End each task with submission of another task

    Add a second duty to each instance of your Runnable objects: Pull the next pending parameter from your database, and submit a task to the executor service.

    The task can be either a new instance of your Runnable implementation, or the task can be the same currently-executing Runnable instance. I prefer the first approach if the second approach would mean writing code to clean “dirty” state.

    When no more input parameters remain, the last running instance of your Runnable simply completes, after shutting down the executor service. The executor service remains with no further work to be done.

    Be sure to eventually shutdown your executor service. Otherwise its backing thread pool may continue running indefinitely, even after your app ends, like a zombie 🧟‍♂️.

    Example code

    Task

    Let's define a task that multiplies a number, then reports on the console.

    This task’s constructor takes a thread-safe Queue of integers which it uses as input. Each task object draws one element, an Integer object, from that queue.

    This task’s constructor also takes a reference to the existing ExecutorService. When this task finishes its work, it then instantiates another task. The old task submits the new task to that passed executor service. The executor service will execute the new task in the future, likely the very near future.

    package work.basil.example.bogus;
    
    import java.time.Duration;
    import java.time.Instant;
    import java.util.Objects;
    import java.util.Queue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.RejectedExecutionException;
    import java.util.concurrent.ThreadLocalRandom;
    
    public class Multiplier implements Runnable
    {
        private final Queue < Integer > queue;
        private final ExecutorService executorService;
    
        public Multiplier ( final Queue < Integer > queue , final ExecutorService executorService )
        {
            this.queue = queue;
            this.executorService = executorService;
        }
    
        @Override
        public void run ( )
        {
            try
            {
                this.doRun();
            }
            catch ( Exception e )
            {
                e.printStackTrace();
            }
        }
    
        private void doRun ( )
        {
            Integer input = queue.poll();
            if ( Objects.nonNull( input ) )  // If we have another input to process.
            {
                // Business logic.
                // Simulate doing much work by sleeping this thread.
                try { Thread.sleep( Duration.ofMillis( ThreadLocalRandom.current().nextInt( 500 , 2_000 ) ) ); } catch ( InterruptedException e ) { throw new RuntimeException( e ); }
                int result = Math.multiplyExact( input , 2 );
                System.out.println( "INFO - input = " + input + " | result = " + result + " at " + Instant.now() );
    
                // Schedule another task.
                // Try to avoid needlessly submitting unnecessary tasks. But still may happen because of the interleaving of concurrent threads.
                if ( ! this.queue.isEmpty() )
                {
                    Multiplier task = new Multiplier( this.queue , this.executorService );
                    try { executorService.submit( task ); } catch ( RejectedExecutionException e ) { System.out.println( "e = " + e ); }
                    System.out.println( "DEBUG - Submitted a subsequent task to executor service. " + Instant.now() );
                }
            }
        }
    }
    

    App

    Let's write an app to run a few of those tasks.

    This app instantiates an executor service backed by a thread pool of a certain size. The app instantiates and summits as many tasks as there are threads, to get the ball rolling.

    From there, each executing task instantiates another task, then submits it for later execution.

    This rolling work-instantiate-submit/work-instantiate-submit cycle continues until we exhaust the queue of inputs.

    package work.basil.example.bogus;
    
    import java.time.Duration;
    import java.time.Instant;
    import java.util.List;
    import java.util.Queue;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.RejectedExecutionException;
    
    public class App
    {
        public static void main ( String[] args )
        {
            App app = new App();
            app.demo();
        }
    
        private void demo ( )
        {
            System.out.println( "INFO - Demo starting. " + Instant.now() );
    
            final List < Integer > data = List.of( 10 , 20 , 30 , 40 , 50 , 60 , 70 );
            final Queue < Integer > queue = new ArrayBlockingQueue <>( data.size() , false , data );
            final int countThreads = 3;
            ExecutorService executorService = Executors.newFixedThreadPool( countThreads );
    
            // Te get the ball rolling, start as many tasks as we have threads.
            // From there, those initial tasks will instantiate and submit further tasks.
            for ( int index = 0 ; index < countThreads ; index++ )
            {
                Multiplier task = new Multiplier( queue , executorService );
                try { executorService.submit( task ); } catch ( RejectedExecutionException e ) { System.out.println( "e = " + e ); }
            }
    
            Duration wait = Duration.ofSeconds( 20 );
            System.out.println( "DEBUG - Main thread will sleep for " + wait + " at " + Instant.now() );
            try { Thread.sleep( wait ); } catch ( InterruptedException e ) { throw new RuntimeException( e ); }
            System.out.println( "DEBUG - Main thread waking after waiting " + wait + " at " + Instant.now() );
    
            executorService.close();
            if ( ! executorService.isTerminated() )
            {
                System.out.println( "ERROR - Executor service STILL not yet terminated after waiting for " + wait + " at " + Instant.now() );
            }
    
            System.out.println( "INFO - Demo ending. " + Instant.now() );
        }
    }
    

    Execution

    When run, we expect to see output similar to this:

    INFO - Demo starting. 2023-02-23T05:51:19.584253Z
    DEBUG - Main thread will sleep for PT20S at 2023-02-23T05:51:19.592739Z
    INFO - input = 30 | result = 60 at 2023-02-23T05:51:20.542284Z
    DEBUG - Submitted a subsequent task to executor service. 2023-02-23T05:51:20.549764Z
    INFO - input = 20 | result = 40 at 2023-02-23T05:51:20.571571Z
    DEBUG - Submitted a subsequent task to executor service. 2023-02-23T05:51:20.571752Z
    INFO - input = 40 | result = 80 at 2023-02-23T05:51:21.180109Z
    DEBUG - Submitted a subsequent task to executor service. 2023-02-23T05:51:21.180650Z
    INFO - input = 10 | result = 20 at 2023-02-23T05:51:21.220425Z
    DEBUG - Submitted a subsequent task to executor service. 2023-02-23T05:51:21.221124Z
    INFO - input = 50 | result = 100 at 2023-02-23T05:51:21.753899Z
    INFO - input = 70 | result = 140 at 2023-02-23T05:51:22.215306Z
    INFO - input = 60 | result = 120 at 2023-02-23T05:51:23.023050Z
    DEBUG - Main thread waking after waiting PT20S at 2023-02-23T05:51:39.588413Z
    INFO - Demo ending. 2023-02-23T05:51:39.592896Z