We have a DownloadFileFlow
class which uses Akka Streams
& Akka Http
for downloading files from difference domains; Sometimes (~50%) in offering to the QueueSource
we encounter StreamDetachedException
error immediately:
java.util.concurrent.CompletionException: akka.stream.StreamDetachedException: Stage with GraphStageLogic akka.stream.impl.QueueSource$$anon$1-queueSource stopped before async invocation was processed
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
at java.base/java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:747)
at java.base/java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:735)
at java.base/java.util.concurrent.CompletableFuture.thenAcceptAsync(CompletableFuture.java:2186)
at scala.concurrent.java8.FuturesConvertersImpl$CF.thenAccept(FutureConvertersImpl.scala:29)
at scala.concurrent.java8.FuturesConvertersImpl$CF.thenAccept(FutureConvertersImpl.scala:18)
at x.y.z.DownloadFileFlow.offer(DownloadFileFlow.java:60)
We have no clue what could be the cause of the error, but there is a postStop
method in akka.stream.impl.QueueSource
, which we doubt it, but don't know the cause of stop:
override def postStop(): Unit = {
val exception = new StreamDetachedException()
completion.tryFailure(exception)
}
Could the reason for this problem be that we have a resource leak somewhere? For example not consuming the http-response-entity in some cases?
AtomicReference<SourceQueueWithComplete<FileDownloadEnvelope>> queueRef = new AtomicReference<>()
RestartSource.onFailuresWithBackoff(
RestartSettings.create(props.getRecoverMinBackOff(), props.getRecoverMaxBackOff(), RANDOM_FACTOR),
() -> Source.<FileDownloadEnvelope>queue(props.getBufferSize(), OverflowStrategy.backpressure(), props.getMaxConcurrentOffers())
.filter(this::checkExpiration)
.map(x -> Pair.create(createContext(x), x))
.flatMapConcat(this::authIfNeeded)
.mapAsyncUnordered(props.getParallelism(), this::process)
.map(this::reply)
.mapMaterializedValue(x -> {
this.queueRef.set(x);
return x;
}))
.runWith(Sink.ignore(), actorSystem);
offer
method):public void offer(FileDownloadEnvelope envelope) {
queueRef.get().offer(envelope)
.thenAccept(x -> {
if (!x.isEnqueued())
replyError(envelope);
}).exceptionally(e -> { // StreamDetachedException catched here
replyError(envelope);
return null;
});
}
Akka Http Props:
akka.http.host-connection-pool.min-connections: "10"
akka.http.host-connection-pool.max-connections: "2000"
akka.http.host-connection-pool.max-open-requests: "4096"
akka.http.host-connection-pool.max-retries: "0"
akka.http.host-connection-pool.client.connecting-timeout: 2s
akka.http.host-connection-pool.client.idle-timeout: 5s
Stream props:
BUFFER-SIZE: "2000"
MAX-CONCURRENT-OFFERS: "2000"
PARALLELISM: "70"
RECOVER-MIN-BACK-OFF: 100ms
RECOVER-MAX-BACK-OFF: 500ms
And we are using akka & akka-stream: 2.6.19
& akka-http: 10.2.0
The reason of error was an issue in handling of a specific error in process
method. An exception was thrown in this case, and apparently, throwing exception causes dropping all existing elements from the queue with `StreamDetachedException reason.
.exceptionally(e -> {
// ...
else if (e.getCause() instanceof FooException) {
throw new RuntimeException(e); // cause
}
// ....
});
Returning error object instead of throwing exception, resolved the issue.