scalaapache-kafkafs2doobie

How to publish an FS2 Stream generated by Doobie to Kafka


I want to publish a long list of events into Kafka consuming an fs2.Stream that corresponds to a very big list of DB rows that will eventually cause an Out Of Memotry error if compiled to List.

So, let's say that I have a - very big - list of UUID keys with millions of records:

def getKeyStream(timeRangeEnd: LocalDateTime): fs2.Stream[doobie.ConnectionIO, UUID]

and that I want to publish an event into Kafka corresponding to a chunk of 500 keys using this Publisher:

trait KeyPublisher {
  def publish(event: ChunkOfKeys): IO[Long]
}

I would like to create a function to enqueue/publish this stream into Kafka:

def enqueueKeyStreamIntoKafka(endDateTime: LocalDateTime): IO[Unit] = {
  getKeyStream(endDateTime)
     .chunkN(500)
     .evalMap(myChunk => ?????)
     ...
}

How can I consume the stream originated by the DB, split it into chunks of constant size, and then publish each of them into Kafka?

It's quite hard to find good documentation or examples about this topic apparently. Could you please point me in the right direction?


Solution

  • Since you don't say what type ChunkOfKeys is, I'm going to assume it's something like Chunk[UUID]

    def enqueueKeyStreamIntoKafka(endDateTime: LocalDateTime)(
        xa: Transactor[IO],
        publisher: KeyPublisher
    ): IO[Unit] =
      getKeyStream(endDateTime)
        .transact(xa) // Convert the ConnectionIO stream to Stream[IO, UUID]
        .chunkN(500)  // into Stream[IO, Chunk[UUID]]
        .evalMap(publisher.publish)  // Into Stream[IO, Long]
        .compile
        .drain // An IO[Unit] that describes the whole process