scalaapache-kafkaakkaakka-httpreactive-kafka

Found java.util.concurrent.Future Required scala.concurrent.Future


Related: scala.concurrent.Future wrapper for java.util.concurrent.Future

This came from my other question:

How to integrate akka streams kafka (reactive-kafka) into akka http application?

I have a AKKA HTTP application, and I'd like to send a message/ProducerRecord to Kafka in the onComplete function in my route, like the following:

val producer : KafkaProducer = new KafkaProducer(producerSettings)

val routes : Route = 
  post {
    entity(as[User]) { user =>
      val createUser = userService.create(user)
      onSuccess(createUser) {
        case Invalid(y: NonEmptyList[Err]) =>  
          complete(BadRequest -> "invalid user")
        case Valid(u: User) => { 
          val producerRecord = 
            new ProducerRecord[Array[Byte], String]("topic1","some message")

          onComplete(producer.send(producerRecord)) { _ =>
            complete(ToResponseMarshallable((StatusCodes.Created, u)))
          }
        }
      }
    }
  }

However, the onComplete(producer send producerRecord) is generating the following type mismatch error:

[error] found : Future[org.apache.kafka.clients.producer.RecordMetadata] (in java.util.concurrent) [error] required: Future[org.apache.kafka.clients.producer.RecordMetadata] (in scala.concurrent) [error] onCompleteRecordMetadata { _ =>

Is there any way around this, maybe by using the Producer as as sink (http://doc.akka.io/docs/akka-stream-kafka/current/producer.html#producer-as-a-sink) instead of the java producer.send function?


Solution

  • You could leverage Cake's Scala based Kafka client, which will do the work of running Java futures and giving you Scala futures back. Once you make sure you create a cakesolutions.kafka.KafkaProducer instead of a org.apache.kafka.clients.producer.KafkaProducer, the rest of your code should practically stay the same.

    Alternatively, you can sort this out leveraging Reactive Kafka whilst keeping using the high level Akka HTTP DSL. You can do it by running your producer record to a Kafka Sink, this way:

    val producerSink = Producer.plainSink(producerSettings)
    
    ...
            // inside the route
            val producerRecord =
              new ProducerRecord[Array[Byte], String]("topic1", "some message")
    
            onComplete(Source.single(producerRecord).runWith(producerSink)) { _ =>
              complete(ToResponseMarshallable((StatusCodes.Created, u)))
            }