scalaakka-stream

Excetption thrown while executing Akka Streams doesn't contain any references to the source code


There is an akka streams scala application, it consumes messages from kafka topics, converts them to Dynamo DB requests, and executes them. Sometimes the app throws exceptions these exceptions:

java.util.concurrent.CompletionException: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Response entity was not subscribed after 1 second. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. POST / Default(104 bytes) -> 200 OK Default(13512 bytes)
    at software.amazon.awssdk.utils.CompletableFutureUtils.errorAsCompletionException(CompletableFutureUtils.java:65)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncExecutionFailureExceptionReportingStage.lambda$execute$0(AsyncExecutionFailureExceptionReportingStage.java:51)
    at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
    at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeAttemptExecute(AsyncRetryableStage.java:103)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:184)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:103)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.completeResponseFuture(MakeAsyncHttpRequestStage.java:240)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:163)
    at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
    at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
    at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Response entity was not subscribed after 1 second. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. POST / Default(104 bytes) -> 200 OK Default(13512 bytes)
    at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:111)
    at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:47)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:223)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:218)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:182)
    ... 23 common frames omitted
Caused by: java.util.concurrent.TimeoutException: Response entity was not subscribed after 1 second. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. POST / Default(104 bytes) -> 200 OK Default(13512 bytes)
    at akka.http.impl.engine.client.pool.SlotState$WaitingForResponseEntitySubscription.onTimeout(SlotState.scala:313)
    at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Event$.$anonfun$onTimeout$1(NewHostConnectionPool.scala:187)
    at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Event$.$anonfun$event0$1(NewHostConnectionPool.scala:189)
    at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Slot.runOneTransition$1(NewHostConnectionPool.scala:277)
    at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Slot.loop$1(NewHostConnectionPool.scala:364)
    at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Slot.updateState(NewHostConnectionPool.scala:373)
    at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Slot.updateState(NewHostConnectionPool.scala:267)
    at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Slot.$anonfun$updateState$2(NewHostConnectionPool.scala:289)
    at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1.$anonfun$safeCallback$1(NewHostConnectionPool.scala:616)
    at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1.$anonfun$safeCallback$1$adapted(NewHostConnectionPool.scala:616)
    at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:467)
    at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:517)
    at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:625)
    at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:800)
    at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:818)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
    at akka.actor.ActorCell.invoke(ActorCell.scala:548)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
    at java. base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)

The stack trace only leads back to the libraries method calls, but not to the actual application source code, any ideas on where to search for the actual cause of the error or how to setup akka to properly report the exception?

Here is roughly the code that makes requests to the dynamo DB:

  def makeRequest[In <: DynamoDbRequest, Out <: DynamoDbResponse](
      client: DynamoDbAsyncClient,
      retries: DynamoDBRetries,
      request: In
  )(implicit
      dbOp: DynamoDbOp[In, Out],
      system: ActorSystem
  ): Future[Out] = {
    implicit val implicitClient: DynamoDbAsyncClient = client
    val source = RestartSource.onFailuresWithBackoff(
      RestartSettings(retries.minBackoff, retries.maxBackoff, retries.randomFactor)
    ) { () =>
      Source.single(request).via(DynamoDb.flow(1)).map(Success.apply).recover {
        // some error handling
      }
    }
    source.map(_.get).runWith(Sink.head)
  }

and here is the code that chains the calls to the DDB:

    makeRequest(
      client,
      config.retries,
      req
    ).flatMap { result => makeRequest(
      client,
      config.retries,
      anotherRequest
    ) }

Solution

  • I'm not sure what is your code doing, but based on Implications of the streaming nature of Request/Response Entities - Integrating with Akka Streams, sounds like you are trying to consume http responses without handling the entity.

    This error indicates that the http response has been available for too long without being consumed.

    The workaround could be

    It can be partially worked around by increasing the subscription timeout, but you will still run the risk of running into network level timeouts and could still exceed the timeout under load

    akka.http.host-connection-pool.response-entity-subscription-timeout = 10.seconds
    

    The suggested solution is implement the handling of the entity

    implicit val system: ActorSystem = ActorSystem()
    implicit val dispatcher: ExecutionContext = system.dispatcher
    
    case class ExamplePerson(name: String)
    
    def parse(line: ByteString): Option[ExamplePerson] =
      line.utf8String.split(" ").headOption.map(ExamplePerson.apply)
    
    val requests: Source[HttpRequest, NotUsed] = Source
      .fromIterator(() =>
        Range(0, 10).map(i => HttpRequest(uri = Uri(s"https://localhost/people/$i"))).iterator
      )
    
    val processorFlow: Flow[Option[ExamplePerson], Int, NotUsed] =
      Flow[Option[ExamplePerson]].map(_.map(_.name.length).getOrElse(0))
    
    // Run and completely consume a single akka http request
    def runRequest(req: HttpRequest): Future[Option[ExamplePerson]] =
      Http()
        .singleRequest(req)
        .flatMap { response =>
          response.entity.dataBytes
            .runReduce(_ ++ _)
            .map(parse)
        }
    
    // Run each akka http flow to completion, then continue processing. You'll want to tune the `parallelism`
    // parameter to mapAsync -- higher values will create more cpu and memory load which may or may not positively
    // impact performance.
    requests
      .mapAsync(2)(runRequest)
      .via(processorFlow)
      .runWith(Sink.ignore)
    

    For more info: