I am working on a file downloader that submits get requests for around thousand files. I came across this article that would aid in submitting a lot of requests using the executor framework. I tried running a smaller number of files (around a hundred), it was working. However, the large number of files that I ran resulted in ConnectionClosedException.
This is the download code that submits the requests:
void download(String sObjname, List<FileMetadata> blobList) throws IOException, InterruptedException
{
long totalSize = 0;
this.sObjname = sObjname;
for (FileMetadata doc : blobList)
{
totalSize += doc.getSize();
doc.setStatus(JobStatus.INIT_COMPLETE);
}
totalFileSize = new AtomicLong(totalSize);
// Async client definiton; MAX_CONN around 5-15
try (CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setMaxConnPerRoute(MAX_CONN)
.setMaxConnTotal(MAX_CONN).build())
{
httpclient.start();
// Define the callback for handling the response and marking the status
FutureCallback<String> futureCallback = new FutureCallback<String>() {
@Override
public void cancelled()
{
logger.error("Task cancelled in the rest client.");
shutdownLatch.countDown();
}
@Override
public void completed(String docPath)
{
FileMetadata doc = futureMap.get(docPath);
logger.info(doc.getPath() + " download completed");
totalFileSize.addAndGet(-1 * doc.getSize());
doc.setStatus(JobStatus.WRITE_COMPLETE);
shutdownLatch.countDown();
}
@Override
public void failed(Exception e)
{
shutdownLatch.countDown();
logger.error("Exception caught under failed for " + sObjname + " " + e.getMessage(), e);
Throwable cause = e.getCause();
if (cause != null && cause.getClass().equals(ClientProtocolException.class))
{
String message = cause.getMessage();
// TODO Remove this
logger.error("Cause message: " + message);
String filePath = message.split("Unable to download the file ")[1].split(" ")[0];
futureMap.get(filePath).setStatus(JobStatus.WRITE_FAILED);
}
}
};
// Submit the get requests here
String folderPath = SalesforceUtility.getFolderPath(sObjname);
new File(new StringBuilder(folderPath).append(File.separator).append(Constants.FILES).toString()).mkdir();
String body = (sObjname.equals(Constants.contentVersion)) ? "/VersionData" : "/body";
shutdownLatch = new CountDownLatch(blobList.size());
for (FileMetadata doc : blobList)
{
String uri = baseUri + "/sobjects/" + sObjname + "/" + doc.getId() + body;
HttpGet httpGet = new HttpGet(uri);
httpGet.addHeader(oauthHeader);
doc.setStatus(JobStatus.WRITING);
// Producer definition
HttpAsyncRequestProducer producer = HttpAsyncMethods.create(httpGet);
// Consumer definition
File docFile = new File(doc.getPath());
HttpAsyncResponseConsumer<String> consumer = new ZeroCopyConsumer<String>(docFile) {
@Override
protected String process(final HttpResponse response, final File file,
final ContentType contentType) throws Exception
{
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK)
{
throw new ClientProtocolException("Unable to download the file " + file.getAbsolutePath()
+ ". Error code: " + response.getStatusLine().getStatusCode() + "; Error message: "
+ response.getStatusLine());
}
return file.getAbsolutePath();
}
};
// Execute the request
logger.info("Submitted download for " + doc.getPath());
httpclient.execute(producer, consumer, futureCallback);
futureMap.put(doc.getPath(), doc);
}
if (futureMap.size() > 0)
schedExec.scheduleAtFixedRate(timerRunnable, 0, 5, TimeUnit.MINUTES);
logger.debug("Waiting for download results for " + sObjname);
shutdownLatch.await();
}
finally
{
schedExec.shutdown();
schedExec.awaitTermination(24, TimeUnit.HOURS);
logger.debug("Finished downloading files for " + sObjname);
}
}
The stacktrace that I received was:
org.apache.http.ConnectionClosedException: Connection closed unexpectedly
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.closed(HttpAsyncRequestExecutor.java:139) [httpcore-nio-4.4.4.jar:4.4.4]
at org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:71) [httpasyncclient-4.1.1.jar:4.1.1]
at org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:39) [httpasyncclient-4.1.1.jar:4.1.1]
at org.apache.http.impl.nio.reactor.AbstractIODispatch.disconnected(AbstractIODispatch.java:102) [httpcore-nio-4.4.4.jar:4.4.4]
at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionClosed(BaseIOReactor.java:281) [httpcore-nio-4.4.4.jar:4.4.4]
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processClosedSessions(AbstractIOReactor.java:442) [httpcore-nio-4.4.4.jar:4.4.4]
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:285) [httpcore-nio-4.4.4.jar:4.4.4]
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:106) [httpcore-nio-4.4.4.jar:4.4.4]
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:590) [httpcore-nio-4.4.4.jar:4.4.4]
at java.lang.Thread.run(Unknown Source) [?:1.8.0_72]
for a number of workers.
Thanks to @lucasvc, the default behaviour is explained here. Pertaining to my solution, the code was updated to the following and the issue did not appear.
IOReactorConfig reactorConfig = IOReactorConfig.custom()
.setConnectTimeout(TIMEOUT_5_MINS_IN_MILLIS)
.setSoTimeout(TIMEOUT_5_MINS_IN_MILLIS).build();
try (CloseableHttpAsyncClient asyncClient = HttpAsyncClients.custom()
.setDefaultIOReactorConfig(reactorConfig)
.setDefaultHeaders(Collections.singletonList(oauthHeader))
.setMaxConnPerRoute(MAX_CONN)
.setMaxConnTotal(MAX_CONN).build();) {
// ...
}