javamultithreadingkotlinexecutorservice

Execution service out of memory


I have an application that reads very large CSV files, with over 100 million records in one file, and sends appropriately formatted JSON to another service. When I run this code, it initially processes smoothly, but after a few seconds, the processing starts to slow down until it eventually comes to a halt, and an 'out of memory' error occurs. How can I prevent this error?

private val nThreads = 10 
    private val executorService = Executors.newFixedThreadPool(nThreads)
    private val manager = PoolingHttpClientConnectionManager()
    private val httpClient = HttpClients.custom()
        .setConnectionManager(manager)
        .build()

    fun send(body: String) {
        val post = HttpPost(URI.create("url"))
        post.setHeader("Content-Type", "application/json")
        post.entity = StringEntity(body)
        httpClient.execute(post)
    }


    fun process() {
        val lines = AtomicInteger()
        ResourceUtils.getFile("classpath:file.csv").bufferedReader().forEachLine {
            lines.getAndIncrement()
            if (lines.get() > 1) {
                val records = it.split(",")
                executorService.execute {
                    send(generate(records[0], records[1], records[2], records[3]))
                }
                if (lines.get() % 10000 == 0) println(lines)
            }
        }
        executorService.shutdown()
        try {
            if (!executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
                executorService.shutdownNow()
            }
        } catch (e: InterruptedException) {
            System.err.println(e.message)
            executorService.shutdownNow()
            Thread.currentThread().interrupt()
        }
    }
    private fun generate(id: String, firstName: String, lastName: String, dateTime: String): String {
"""JSON TEXT """
 }

Solution

  • Good:

    Bad:

    Avoid Executor, use ExecutorService

    Regarding the first problem, read the Javadoc for execute:

    The command may execute in a new thread, in a pooled thread, or in the calling thread, at the discretion of the Executor implementation.

    That third case, “in the calling thread", means the task would not make use of any background threads.

    The solution is to call instead the sub-interface ExecutorService methods rather than that Executor method.

    Throttle creation of tasks, not just their execution

    For the second problem, you have successfully throttled your tasks’ execution. But you neglected to throttle their creation.

    Your limit of 10 threads is throttling the execution of your tasks, but your forEachLine call is not throttled. The forEachLine call results in immediately creating a task object for each and every of the 100,000,000 rows, all resident in memory long before the executor service gets around to executing those tasks.

    My first thought about how to throttle their creation is to use a Semaphore with a limited number of permits. In your code that creates the task, try to acquire one of the permits. If available, the task object in instantiated and submitted to the executor service.

    Example app

    Here is an example app.

    public class CsvProcessor
    {
        public static void main ( String[] args )
        {
            CsvProcessor app = new CsvProcessor( );
            app.demo( );
        }
    
        private void demo ( )
        {
            final int MAXIMUM_THREADS = 10;
            final Semaphore semaphoreToThrottleTaskCreation = new Semaphore( MAXIMUM_THREADS );
            final Path path = Paths.get( "/Users/basil_dot_work/HundredMillion.csv" );
            try (
                    final BufferedReader bufferedReader = Files.newBufferedReader( path , StandardCharsets.UTF_8 ) ;
                    final ExecutorService executorService = Executors.newFixedThreadPool( MAXIMUM_THREADS )
            )
            {
                bufferedReader.lines( ).forEach(
                        ( String line ) -> {
                            try
                            {
                                semaphoreToThrottleTaskCreation.acquire( );  // Blocks until a permit becomes available.
                                executorService.submit( ( ) -> {
                                    try
                                    {
                                        // Simulate work load for generating JSON.
                                        try { Thread.sleep( Duration.ofSeconds( ThreadLocalRandom.current( ).nextInt( 1 , 10 ) ) ); } catch ( InterruptedException e ) { throw new RuntimeException( e ); }
                                        // Simulate work load for making HTTP POST request.
                                        try { Thread.sleep( Duration.ofSeconds( ThreadLocalRandom.current( ).nextInt( 1 , 10 ) ) ); } catch ( InterruptedException e ) { throw new RuntimeException( e ); }
                                        System.out.println( "Processing line: " + line );
                                    } finally
                                    {
                                        semaphoreToThrottleTaskCreation.release( );  // Returns a permit back to the pool.
                                    }
                                } );
                            } catch ( InterruptedException e )
                            {
                                throw new RuntimeException( e );
                            }
                        }
                );
            } catch ( IOException e )
            {
                throw new RuntimeException( e );
            }
        }
    }
    

    Notice how we used try-with-resources syntax to automatically close our executor service.

    I only tried this with a data file of twelve rows. So not fully tested. But should get you going in the right direction.

    When run:

    Processing line: 2, Bob, Barker, 2023-02-23T12:34Z
    Processing line: 10, Janice, Anderson, 2023-01-23T12:34Z
    Processing line: 9, Ivar, Channing, 2023-03-23T12:34Z
    Processing line: 7, Gregory, Anderson, 2023-01-23T12:34Z
    Processing line: 3, Carol, Channing, 2023-03-23T12:34Z
    Processing line: 8, Harriet, Barker, 2023-02-23T12:34Z
    Processing line: 6, Francine, Channing, 2023-03-23T12:34Z
    Processing line: 4, Davis, Anderson, 2023-01-23T12:34Z
    Processing line: 1, Alice, Anderson, 2023-01-23T12:34Z
    Processing line: 11, Keith, Barker, 2023-02-23T12:34Z
    Processing line: 5, Edie, Barker, 2023-02-23T12:34Z
    Processing line: 12, Lisa, Channing, 2023-03-23T12:34Z
    

    Use semaphore to control both creation and execution of tasks

    If you reread that code, you may notice that our semaphore for throttling creation of our task objects is actually doing double-duty by also limiting execution of our tasks.

    So we do not really need to specify a number of threads to be pooled by our executor service. We could replace Executors.newFixedThreadPool( MAXIMUM_THREADS ) with Executors.newCachedThreadPool(). This alternate executor service implementation creates any number of threads. But given our use of Semaphore, we know the maximum number of threads in use will be 10. (Assuming we ignore the slight moment between our release of the semaphore and the exiting of our task’s execution.)

    Virtual Threads

    We could go a step further and consider using virtual threads rather than platform threads for this work. These tasks described in the Question are making network calls. Such calls involve much blocking. That makes them a candidate for virtual threads. If the tasks are not CPU-bound, do not involve long-running use of synchronized or native code (JNI, Foreign Functions), then we should be using virtual threads for maximum performance.

    You can reasonably have millions of virtual threads at a time, given that they are so efficient with both memory and CPU. However, we do not want to overwhelm the server being hit by our HTTP POST requests, nor do we want to recreate our problem of running out of memory by loading all 100 million lines in memory. So we need to leave our Semaphore throttle in place. All we need is to replace the Executors. … call. Change this:

    ExecutorService executorService = Executors.newFixedThreadPool( MAXIMUM_THREADS )
    

    … to this:

    ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor()
    

    For more info, see Managing Throughput with Virtual Threads - Sip of Java by Billy Korando (2024-02).


    By the way, parsing an enormous CSV file line-by-line to generate JSON for submission over a network web-services call is an extremely inefficient way to communicate data. Consider an alternatives. At a minimum, transmit a batch of lines rather than single lines. Even better, skip CSV parsing, JSON generation, and web-service calls: Upload the file directly using a database’s bulk-data feature like COPY in Postgres.