I have an application that reads very large CSV files, with over 100 million records in one file, and sends appropriately formatted JSON to another service. When I run this code, it initially processes smoothly, but after a few seconds, the processing starts to slow down until it eventually comes to a halt, and an 'out of memory' error occurs. How can I prevent this error?
private val nThreads = 10
private val executorService = Executors.newFixedThreadPool(nThreads)
private val manager = PoolingHttpClientConnectionManager()
private val httpClient = HttpClients.custom()
.setConnectionManager(manager)
.build()
fun send(body: String) {
val post = HttpPost(URI.create("url"))
post.setHeader("Content-Type", "application/json")
post.entity = StringEntity(body)
httpClient.execute(post)
}
fun process() {
val lines = AtomicInteger()
ResourceUtils.getFile("classpath:file.csv").bufferedReader().forEachLine {
lines.getAndIncrement()
if (lines.get() > 1) {
val records = it.split(",")
executorService.execute {
send(generate(records[0], records[1], records[2], records[3]))
}
if (lines.get() % 10000 == 0) println(lines)
}
}
executorService.shutdown()
try {
if (!executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
executorService.shutdownNow()
}
} catch (e: InterruptedException) {
System.err.println(e.message)
executorService.shutdownNow()
Thread.currentThread().interrupt()
}
}
private fun generate(id: String, firstName: String, lastName: String, dateTime: String): String {
"""JSON TEXT """
}
Good:
BufferedReader
to gradually read from the enormous CSV file.Bad:
Executor#execute
method that may or may not run the task on a background thread.Executor
, use ExecutorService
Regarding the first problem, read the Javadoc for execute
:
The command may execute in a new thread, in a pooled thread, or in the calling thread, at the discretion of the Executor implementation.
That third case, “in the calling thread", means the task would not make use of any background threads.
The solution is to call instead the sub-interface ExecutorService
methods rather than that Executor
method.
For the second problem, you have successfully throttled your tasks’ execution. But you neglected to throttle their creation.
Your limit of 10 threads is throttling the execution of your tasks, but your forEachLine
call is not throttled. The forEachLine
call results in immediately creating a task object for each and every of the 100,000,000 rows, all resident in memory long before the executor service gets around to executing those tasks.
My first thought about how to throttle their creation is to use a Semaphore
with a limited number of permits. In your code that creates the task, try to acquire one of the permits. If available, the task object in instantiated and submitted to the executor service.
Here is an example app.
public class CsvProcessor
{
public static void main ( String[] args )
{
CsvProcessor app = new CsvProcessor( );
app.demo( );
}
private void demo ( )
{
final int MAXIMUM_THREADS = 10;
final Semaphore semaphoreToThrottleTaskCreation = new Semaphore( MAXIMUM_THREADS );
final Path path = Paths.get( "/Users/basil_dot_work/HundredMillion.csv" );
try (
final BufferedReader bufferedReader = Files.newBufferedReader( path , StandardCharsets.UTF_8 ) ;
final ExecutorService executorService = Executors.newFixedThreadPool( MAXIMUM_THREADS )
)
{
bufferedReader.lines( ).forEach(
( String line ) -> {
try
{
semaphoreToThrottleTaskCreation.acquire( ); // Blocks until a permit becomes available.
executorService.submit( ( ) -> {
try
{
// Simulate work load for generating JSON.
try { Thread.sleep( Duration.ofSeconds( ThreadLocalRandom.current( ).nextInt( 1 , 10 ) ) ); } catch ( InterruptedException e ) { throw new RuntimeException( e ); }
// Simulate work load for making HTTP POST request.
try { Thread.sleep( Duration.ofSeconds( ThreadLocalRandom.current( ).nextInt( 1 , 10 ) ) ); } catch ( InterruptedException e ) { throw new RuntimeException( e ); }
System.out.println( "Processing line: " + line );
} finally
{
semaphoreToThrottleTaskCreation.release( ); // Returns a permit back to the pool.
}
} );
} catch ( InterruptedException e )
{
throw new RuntimeException( e );
}
}
);
} catch ( IOException e )
{
throw new RuntimeException( e );
}
}
}
Notice how we used try-with-resources syntax to automatically close our executor service.
I only tried this with a data file of twelve rows. So not fully tested. But should get you going in the right direction.
When run:
Processing line: 2, Bob, Barker, 2023-02-23T12:34Z
Processing line: 10, Janice, Anderson, 2023-01-23T12:34Z
Processing line: 9, Ivar, Channing, 2023-03-23T12:34Z
Processing line: 7, Gregory, Anderson, 2023-01-23T12:34Z
Processing line: 3, Carol, Channing, 2023-03-23T12:34Z
Processing line: 8, Harriet, Barker, 2023-02-23T12:34Z
Processing line: 6, Francine, Channing, 2023-03-23T12:34Z
Processing line: 4, Davis, Anderson, 2023-01-23T12:34Z
Processing line: 1, Alice, Anderson, 2023-01-23T12:34Z
Processing line: 11, Keith, Barker, 2023-02-23T12:34Z
Processing line: 5, Edie, Barker, 2023-02-23T12:34Z
Processing line: 12, Lisa, Channing, 2023-03-23T12:34Z
If you reread that code, you may notice that our semaphore for throttling creation of our task objects is actually doing double-duty by also limiting execution of our tasks.
So we do not really need to specify a number of threads to be pooled by our executor service. We could replace Executors.newFixedThreadPool( MAXIMUM_THREADS )
with Executors.newCachedThreadPool()
. This alternate executor service implementation creates any number of threads. But given our use of Semaphore
, we know the maximum number of threads in use will be 10. (Assuming we ignore the slight moment between our release of the semaphore and the exiting of our task’s execution.)
We could go a step further and consider using virtual threads rather than platform threads for this work. These tasks described in the Question are making network calls. Such calls involve much blocking. That makes them a candidate for virtual threads. If the tasks are not CPU-bound, do not involve long-running use of synchronized
or native code (JNI, Foreign Functions), then we should be using virtual threads for maximum performance.
You can reasonably have millions of virtual threads at a time, given that they are so efficient with both memory and CPU. However, we do not want to overwhelm the server being hit by our HTTP POST requests, nor do we want to recreate our problem of running out of memory by loading all 100 million lines in memory. So we need to leave our Semaphore
throttle in place. All we need is to replace the Executors. …
call. Change this:
ExecutorService executorService = Executors.newFixedThreadPool( MAXIMUM_THREADS )
… to this:
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor()
For more info, see Managing Throughput with Virtual Threads - Sip of Java by Billy Korando (2024-02).
By the way, parsing an enormous CSV file line-by-line to generate JSON for submission over a network web-services call is an extremely inefficient way to communicate data. Consider an alternatives. At a minimum, transmit a batch of lines rather than single lines. Even better, skip CSV parsing, JSON generation, and web-service calls: Upload the file directly using a database’s bulk-data feature like COPY
in Postgres.