I have a block of code like this:
def downloadFilesSource(bucketName: String, primaryKey: String)(
implicit ec: ExecutionContext
): Source[(Source[ByteString, NotUsed], String), NotUsed] =
S3.listBucket(bucketName, prefix = Some(primaryKey))
.mapAsync(configuration.config.downloadParallelism.value)(
(r: ListBucketResultContents) =>
Future {
S3.download(r.bucketName, r.key).zip(Source.single(r.key))
}
)
.flatMapConcat(identity)
.map {
case (Some(x), key) => (x._1, key)
case (None, _) => throw new RuntimeException()
}
which downloads all the files in an Amazon S3 bucket, and returns (a Source of) the contents tuple'd with the name of the file.
Upgrading to Alpakka 4.0.0, the S3.download
method has been removed. The following code seemed like a good replacement:
def downloadFilesSource(bucketName: String, primaryKey: String)(
implicit ec: ExecutionContext
): Source[(ByteString, String), NotUsed] = {
S3.listBucket(bucketName, prefix = Some(primaryKey))
.mapAsync(configuration.config.downloadParallelism.value)((r: ListBucketResultContents) =>
Future {
S3.getObject(r.bucketName, r.key).zip(Source.single(r.key))
}
)
.flatMapConcat(identity)
}
However, the contents of the file is always truncated.
As an experiment, I also tried (in the function body):
S3.listBucket(bucketName, prefix = Some(primaryKey))
.map((r: ListBucketResultContents) => S3.getObject(r.bucketName, r.key).zip(Source.single(r.key)))
.flatMapConcat(identity)
in case I wasn't waiting on the future correctly, but the files are truncated in the same way. I assume there's something I'm just missing about the streaming nature of Alpakka.
I've looked at Alpakka and S3 truncating downloaded files, but I don't see that the answers there are relevant :(
The source of the problems I was having seems to be that, as the Akka docs imply, a zip of sources will complete as soon as any one of the zipped streams completes. A Source.single
is defined to complete after one object, so if getObject
emits multiple chunks the zip will discard all subsequent chunks.
Handling the get like this, and mapping the complete bytestring to a tuple afterwards (in place of the zip) avoids the truncation:
S3.getObject(r.bucketName, r.key)
.fold(ByteString.empty)(_ ++ _)
.map(byteString => (byteString, r.key))