scalaakka-stream

Akka Streaming - Redistribute chunks into max_permissible_chunk_size Scala


My code is uploading binary to s3 using Akka streaming as shown below:

source
              .via(distributeChunks(MAX_BYTES_PER_CHUNK))
              .throttle(maxRequestsPerSecond, 1.second, maximumBurst = 1, ThrottleMode.Shaping)
              .runFoldAsync(1) { (partNumber, bytes) =>
                {...}

I need to write distributeChunks in such a way that it should break every incoming stream in either MAX_BYTES_PER_CHUNK or less than that only in case of last chunk if bytes size is less than MAX_BYTES_PER_CHUNK.

I tried this:

private def distributeChunks(maxChunkSize: Int): Flow[ByteString, ByteString, NotUsed] =
    Flow[ByteString]
      .statefulMapConcat { () =>
        var buffer = ByteString.empty

        { bs: ByteString =>
          buffer ++= bs
          val chunks = new ArrayBuffer[ByteString]

          while (buffer.length >= maxChunkSize) {
            val (chunk, rest) = buffer.splitAt(maxChunkSize)
            chunks += chunk
            buffer = rest
          }

          chunks.toList
        }
      }
      .mapMaterializedValue(_ => NotUsed)

This ensures every chunk size is equal to MAX_BYTES_PER_CHUNK but it misses out the last chunk and I am a bit confused to how to fix that. Can someone help me simulate this better and come up with the right code to have desired results?

Here are two test cases:

FILE SIZE: 10MB, MAX_PERMISSIBLE_CHUNK: 2MB should break into 5 chunks of 2MB.
FILE SIZE: 9MB, MAX_PERMISSIBLE_CHUNK: 2MB should break into 4 chunks of 2MB and 1 chunk of exactly 1MB.

Solution

  • After a lot of debugging I realised that I need to use custom inlet and outlet handler in order to ensure the bytes are correctly accumulated:

    def distributeChunks(maxChunkSize: Int): Flow[ByteString, ByteString, NotUsed] =
        Flow.fromGraph(new GraphStage[FlowShape[ByteString, ByteString]] {
          val in: Inlet[ByteString] = Inlet[ByteString]("distributeChunks.in")
          val out: Outlet[ByteString] = Outlet[ByteString]("distributeChunks.out")
          override val shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)
    
          override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
            new GraphStageLogic(shape) {
              var buffer: ByteString = ByteString.empty
    
              setHandler(in, new InHandler {
                override def onPush(): Unit = {
                  buffer ++= grab(in)
                  if (buffer.length >= maxChunkSize) {
                    val (chunk, rest) = buffer.splitAt(maxChunkSize)
                    buffer = rest
                    push(out, chunk)
                  } else {
                    pull(in)
                  }
                }
    
                override def onUpstreamFinish(): Unit = {
                  if (buffer.nonEmpty) {
                    emit(out, buffer)
                  }
                  completeStage()
                }
              })
    
              setHandler(out, new OutHandler {
                override def onPull(): Unit = {
                  if (!hasBeenPulled(in)) {
                    pull(in)
                  }
                }
              })
            }
        })
    

    The problem with while loop is that it doesn't have control to the bytes injected from client side and this can cause issues. It may depend upon the network configuration and many other factors and so I used custom streaming to sort this out.

    Also just noticed that Akka is now under BSL which is sad hopefully the old functionalities can be used as is. :)