Is there a way to split body of exchange (List) into bigger chuncks than 1 element exchanges?
When using big files to process, i end up with huge number of one-line exchanges and alot of memory consumption.
from(url.concat("RAW(" + filesToInclude + ")"))
.log("Start processing file: ${header.CamelFileName}")
.routeId(CamelRoutes.LIST_ROUTE_ID + endpointConfig.getConfigId())
.setHeader(CamelHeaders.ENDPOINT_CONFIG, constant(endpointConfig))
.unmarshal(getCsvDataFormat(endpointConfig))
.split(body())
.streaming().parallelProcessing().executorService(Executors.newFixedThreadPool(CONCURRENT_THREADS_NUMBER))
.stopOnException().stopOnAggregateException()
.marshal().json(JsonLibrary.Jackson)
.log("Starting aggregating for batch processing")
.aggregate(header(Exchange.FILE_NAME), new ListAggregationStrategy())
.completionPredicate(new BatchSizePredicate(endpointConfig.getMaxBatchSize()))
.completionTimeout(endpointConfig.getBatchTimeout())
.log("Start one batch processing")
.bean(rowCamelService, "saveInDB")
.log("Finished one batch processing")
.end()
.log("Finished aggregating for batch processing")
.log("Finished processing file: ${header.CamelFileName}")
.end();
For 60MB of CSV file, it takes over 1G or memory. I need to lower it down to 500MB at maximum.
You use setBody(...) to partition the list before splitting
public class MyRouteBuilder extends RouteBuilder {
private static final int BATCH_SIZE = 1000;
private <T> List<List<T>> splitIntoBatches(List<T> list, int batchSize) {
// TODO implement using one of the suggestions from https://www.baeldung.com/java-list-split
}
public void configure() {
from(...)
.setBody(exchange -> {
List<String> list = exchange.getMessage().getBody(List.class);
return splitIntoBatches(list, BATCH_SIZE);
})
.split(body())
.streaming()
...
}
}