I'm trying to learn Scala and having a good bit of fun, but I'm running into this classic problem. It reminds me a lot of nested callback hell in the early days of NodeJS.
Here's my program in psuedocode:
At one point I wind up with the type: Task[Iterator[Task[List[Bucket]]]]
Essentially:
The outer task being the initial step to list all the S3 buckets, and then the inside Iterator/Task/List is trying to batch Tasks that return lists.
I would hope there's some way to remove/flatten the outer Task to get to Iterator[Task[List[Bucket]]]
.
When I try to break down my processing into steps the deep nesting causes me to do many nested maps. Is this the right thing to do or is there a better way to handle this nesting?
In this particular case, I would suggest something like FS2 with Monix as F:
import cats.implicits._
import monix.eval._, monix.execution._
import fs2._
// use your own types here
type BucketName = String
type BucketRegion = String
type S3Object = String
// use your own implementations as well
val fetchS3Buckets: Task[List[BucketName]] = Task(???)
val bucketRegion: BucketName => Task[BucketRegion] = _ => Task(???)
val listObject: BucketName => Task[List[S3Object]] = _ => Task(???)
Stream.evalSeq(fetchS3Buckets)
.parEvalMap(10) { name =>
// checking region, filtering and listing on batches of 10
bucketRegion(name).flatMap {
case "my-region" => listObject(name)
case _ => Task.pure(List.empty)
}
}
.foldMonoid // combines List[S3Object] together
.compile.lastOrError // turns into Task with result
.map(list => println(s"Result: $list"))
.onErrorHandle { case error: Throwable => println(error) }
.runToFuture // or however you handle it
FS2 underneath uses cats.effect.IO or Monix Task, or whatever you want as long is it provided Cats Effect type classes. It builds a nice, functional DSL to design streams of data, so you could use reactive streams without Akka Streams.
Here there is this little problem that we are printing all results at once, which might be a bad idea if there was more of them than the memory could handle - we could do the printing in batches (weren't sure if that is what you wanted or not) or make filtering and printing separate batches.
Stream.evalSeq(fetchS3Buckets)
.parEvalMap(10) { name =>
bucketRegion(name).map(name -> _)
}
.collect { case (name, "my-region") => name }
.parEvalMap(10) { name =>
listObject(name).map(list => println(s"Result: $list"))
}
.compile
.drain
While none of that is impossible in bare Monix, FS2 makes such operations much easier to write and maintain, so you should be able to implement your flow much easier.