scalaakka-streamakka-httpalpakka

akka.http.scaladsl.model.ParsingException: Unexpected end of multipart entity while uploading a large file to S3 using akka http


I am trying to upload a large file (90 MB for now) to S3 using Akka HTTP with Alpakka S3 connector. It is working fine for small files (25 MB) but when I try to upload large file (90 MB), I got the following error:

akka.http.scaladsl.model.ParsingException: Unexpected end of multipart entity
at akka.http.scaladsl.unmarshalling.MultipartUnmarshallers$$anonfun$1.applyOrElse(MultipartUnmarshallers.scala:108)
at akka.http.scaladsl.unmarshalling.MultipartUnmarshallers$$anonfun$1.applyOrElse(MultipartUnmarshallers.scala:103)
at akka.stream.impl.fusing.Collect$$anon$6.$anonfun$wrappedPf$1(Ops.scala:227)
at akka.stream.impl.fusing.SupervisedGraphStageLogic.withSupervision(Ops.scala:186)
at akka.stream.impl.fusing.Collect$$anon$6.onPush(Ops.scala:229)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:523)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:510)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:376)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:606)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:485)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:581)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:749)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:739)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:765)
at akka.actor.Actor.aroundReceive(Actor.scala:539)
at akka.actor.Actor.aroundReceive$(Actor.scala:537)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:671)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:614)
at akka.actor.ActorCell.invoke(ActorCell.scala:583)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
at akka.dispatch.Mailbox.run(Mailbox.scala:229)
at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Although, I get the success message at the end but file does not uploaded completely. It gets upload of 45-50 MB only.

I am using the below code: S3Utility.scala

    class S3Utility(implicit as: ActorSystem, m: Materializer) {
  private val bucketName = "test"

  def sink(fileInfo: FileInfo): Sink[ByteString, Future[MultipartUploadResult]] = {
    val fileName = fileInfo.fileName
    S3.multipartUpload(bucketName, fileName)
  }
}

Routes:

def uploadLargeFile: Route =
  post {
    path("import" / "file") {
      extractMaterializer { implicit materializer =>
        withoutSizeLimit {
          fileUpload("file") {
            case (metadata, byteSource) =>
              logger.info(s"Request received to import large file: ${metadata.fileName}")
              val uploadFuture = byteSource.runWith(s3Utility.sink(metadata))
              onComplete(uploadFuture) {
                case Success(result) =>
                  logger.info(s"Successfully uploaded file")
                  complete(StatusCodes.OK)
                case Failure(ex) =>
                  println(ex, "Error in uploading file")
                  complete(StatusCodes.FailedDependency, ex.getMessage)
              }
          }
        }
      }
    }
  }

Any help would be appraciated. Thanks


Solution

  • Strategy 1

    Can you break the file into smaller chunks and retry, here is the sample code:

    AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
                .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("some-kind-of-endpoint"))
                .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("user", "pass")))
                .disableChunkedEncoding()
                .withPathStyleAccessEnabled(true)
                .build();
    
        // Create a list of UploadPartResponse objects. You get one of these
        // for each part upload.
        List<PartETag> partETags = new ArrayList<PartETag>();
    
        // Step 1: Initialize.
        InitiateMultipartUploadRequest initRequest = new
                InitiateMultipartUploadRequest("bucket", "key");
        InitiateMultipartUploadResult initResponse =
                s3Client.initiateMultipartUpload(initRequest);
    
        File file = new File("filepath");
        long contentLength = file.length();
        long partSize = 5242880; // Set part size to 5 MB.
    
        try {
            // Step 2: Upload parts.
            long filePosition = 0;
            for (int i = 1; filePosition < contentLength; i++) {
                // Last part can be less than 5 MB. Adjust part size.
                partSize = Math.min(partSize, (contentLength - filePosition));
    
                // Create a request to upload a part.
                UploadPartRequest uploadRequest = new UploadPartRequest()
                        .withBucketName("bucket").withKey("key")
                        .withUploadId(initResponse.getUploadId()).withPartNumber(i)
                        .withFileOffset(filePosition)
                        .withFile(file)
                        .withPartSize(partSize);
    
                // Upload part and add response to our list.
                partETags.add(
                        s3Client.uploadPart(uploadRequest).getPartETag());
    
                filePosition += partSize;
            }
    
            // Step 3: Complete.
            CompleteMultipartUploadRequest compRequest = new
                    CompleteMultipartUploadRequest(
                    "bucket",
                    "key",
                    initResponse.getUploadId(),
                    partETags);
    
            s3Client.completeMultipartUpload(compRequest);
        } catch (Exception e) {
            s3Client.abortMultipartUpload(new AbortMultipartUploadRequest(
                    "bucket", "key", initResponse.getUploadId()));
        }
    

    Strategy 2

    Increase the idle-timeout of the Akka HTTP server (just set it to infinite), like the following:

    akka.http.server.idle-timeout=infinite
    

    This would increase the time period for which the server expects to be idle. By default its value is 60 seconds. And if the server is not able to upload the file within that time period, it will close the connection and throw "Unexpected end of multipart entity" error.