I have a simple route which polls zip files from FTP server. The zip file consists of one file that needs processing and zero or more attachments. I am trying to use ZipFileDataFormat for splitting and I'm able to split and route the items as desired i.e. send the processing file to the processor and other files to the aggregator endpoint.
The route looks like below:
from(sftp://username@server/folder/path?password=password&delay=600000)
.unmarshal(getZipFileDataFormat()).split(body(Iterator.class)).streaming()
.log("CamelSplitComplete :: ${header.CamelSplitComplete}")
.log("Split Size :: ${header.CamelSplitSize}")
.choice()
.when(header(MyConstants.CAMEL_FILE_NAME_HEADER).contains(".json"))
.to(JSON_ENDPOINT).endChoice()
.otherwise()
.to(AGGREGATOR_ENDPOINT)
.endChoice()
.end();
getZipFileDataFormat
private ZipFileDataFormat getZipFileDataFormat() {
ZipFileDataFormat zipFile = new ZipFileDataFormat();
zipFile.setUsingIterator(true);
return zipFile;
}
The splitting works fine. However, I can see in the logs that the two headers CamelSplitComplete and CamelSplitSize are not set correctly. Where CamelSplitComplete is always false, CamelSplitSize is not having any value.
Because of this, I am not able to aggregate based on the size. I am using eagerCheckCompletion() for getting the input exchange in the aggregator route. My aggregator route looks like below.
from(AGGREGATOR_ENDPOINT).aggregate(new ZipAggregationStrategy()).constant(true)
.eagerCheckCompletion().completionSize(header("CamelSplitSize"))to("file:///tmp/").end();
I read Apache Documentation that these headers are always set. Am I missing anything here? Any pointer in the right direction would be very helpful.
I was able to get the whole route to work. I had to add a sort of pre-processor which would set some essential headers (Outgoing file name and file count of the zip) I'd require for aggregation.
from(sftp://username@server/folder/path?password=password&delay=600000).to("file:///tmp/")
.beanRef("headerProcessor").unmarshal(getZipFileDataFormat())
.split(body(Iterator.class)).streaming()
.choice()
.when(header(Exchange.FILE_NAME).contains(".json"))
.to(JSON_ENDPOINT).endChoice()
.otherwise()
.to(AGGREGATOR_ENDPOINT)
.endChoice()
.end();
After that, the zip aggregation strategy worked as expected. Putting here the aggregation route just for completion of answer.
from(AGGREGATOR_ENDPOINT)
.aggregate(header(MyConstants.HEADER_OUTGOING_FILE_NAME), new ZipAggregationStrategy())
.eagerCheckCompletion().completionSize(header(MyConstants.HEADER_TOTAL_FILE_COUNT))
.setHeader(Exchange.FILE_NAME, simple("${header.outgoingFileName}"))
.to("file:///tmp/").end();