There is an akka streams scala application, it consumes messages from kafka topics, converts them to Dynamo DB requests, and executes them. Sometimes the app throws exceptions these exceptions:
java.util.concurrent.CompletionException: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Response entity was not subscribed after 1 second. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. POST / Default(104 bytes) -> 200 OK Default(13512 bytes)
at software.amazon.awssdk.utils.CompletableFutureUtils.errorAsCompletionException(CompletableFutureUtils.java:65)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncExecutionFailureExceptionReportingStage.lambda$execute$0(AsyncExecutionFailureExceptionReportingStage.java:51)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeAttemptExecute(AsyncRetryableStage.java:103)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:184)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:103)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.completeResponseFuture(MakeAsyncHttpRequestStage.java:240)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:163)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Response entity was not subscribed after 1 second. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. POST / Default(104 bytes) -> 200 OK Default(13512 bytes)
at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:111)
at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:47)
at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:223)
at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:218)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:182)
... 23 common frames omitted
Caused by: java.util.concurrent.TimeoutException: Response entity was not subscribed after 1 second. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. POST / Default(104 bytes) -> 200 OK Default(13512 bytes)
at akka.http.impl.engine.client.pool.SlotState$WaitingForResponseEntitySubscription.onTimeout(SlotState.scala:313)
at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Event$.$anonfun$onTimeout$1(NewHostConnectionPool.scala:187)
at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Event$.$anonfun$event0$1(NewHostConnectionPool.scala:189)
at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Slot.runOneTransition$1(NewHostConnectionPool.scala:277)
at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Slot.loop$1(NewHostConnectionPool.scala:364)
at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Slot.updateState(NewHostConnectionPool.scala:373)
at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Slot.updateState(NewHostConnectionPool.scala:267)
at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Slot.$anonfun$updateState$2(NewHostConnectionPool.scala:289)
at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1.$anonfun$safeCallback$1(NewHostConnectionPool.scala:616)
at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1.$anonfun$safeCallback$1$adapted(NewHostConnectionPool.scala:616)
at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:467)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:517)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:625)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:800)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:818)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java. base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
The stack trace only leads back to the libraries method calls, but not to the actual application source code, any ideas on where to search for the actual cause of the error or how to setup akka to properly report the exception?
Here is roughly the code that makes requests to the dynamo DB:
def makeRequest[In <: DynamoDbRequest, Out <: DynamoDbResponse](
client: DynamoDbAsyncClient,
retries: DynamoDBRetries,
request: In
)(implicit
dbOp: DynamoDbOp[In, Out],
system: ActorSystem
): Future[Out] = {
implicit val implicitClient: DynamoDbAsyncClient = client
val source = RestartSource.onFailuresWithBackoff(
RestartSettings(retries.minBackoff, retries.maxBackoff, retries.randomFactor)
) { () =>
Source.single(request).via(DynamoDb.flow(1)).map(Success.apply).recover {
// some error handling
}
}
source.map(_.get).runWith(Sink.head)
}
and here is the code that chains the calls to the DDB:
makeRequest(
client,
config.retries,
req
).flatMap { result => makeRequest(
client,
config.retries,
anotherRequest
) }
I'm not sure what is your code doing, but based on Implications of the streaming nature of Request/Response Entities - Integrating with Akka Streams, sounds like you are trying to consume http responses without handling the entity.
This error indicates that the http response has been available for too long without being consumed.
The workaround could be
It can be partially worked around by increasing the subscription timeout, but you will still run the risk of running into network level timeouts and could still exceed the timeout under load
akka.http.host-connection-pool.response-entity-subscription-timeout = 10.seconds
The suggested solution is implement the handling of the entity
implicit val system: ActorSystem = ActorSystem()
implicit val dispatcher: ExecutionContext = system.dispatcher
case class ExamplePerson(name: String)
def parse(line: ByteString): Option[ExamplePerson] =
line.utf8String.split(" ").headOption.map(ExamplePerson.apply)
val requests: Source[HttpRequest, NotUsed] = Source
.fromIterator(() =>
Range(0, 10).map(i => HttpRequest(uri = Uri(s"https://localhost/people/$i"))).iterator
)
val processorFlow: Flow[Option[ExamplePerson], Int, NotUsed] =
Flow[Option[ExamplePerson]].map(_.map(_.name.length).getOrElse(0))
// Run and completely consume a single akka http request
def runRequest(req: HttpRequest): Future[Option[ExamplePerson]] =
Http()
.singleRequest(req)
.flatMap { response =>
response.entity.dataBytes
.runReduce(_ ++ _)
.map(parse)
}
// Run each akka http flow to completion, then continue processing. You'll want to tune the `parallelism`
// parameter to mapAsync -- higher values will create more cpu and memory load which may or may not positively
// impact performance.
requests
.mapAsync(2)(runRequest)
.via(processorFlow)
.runWith(Sink.ignore)
For more info: