scalaapache-kafkaintegration-testingapache-flinkembedded-kafka

Integration test Flink and Kafka with scalatest-embedded-kafka


I would like to run integration test with Flink and Kafka. The process is to read from Kafka, some manipulation with Flink and put the datastream in kafka.

I would like to test the process from the begining to the end. For now I use scalatest-embedded-kafka.

I put an example here I tried to be as simple as possible :

import java.util.Properties

import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
import org.scalatest.{Matchers, WordSpec}

import scala.collection.mutable.ListBuffer

object SimpleFlinkKafkaTest {

  class CollectSink extends SinkFunction[String] {
    override def invoke(string: String): Unit = {
      synchronized {
        CollectSink.values += string
      }
    }
  }

  object CollectSink {
    val values: ListBuffer[String] = ListBuffer.empty[String]
  }

  val kafkaPort = 9092
  val zooKeeperPort = 2181

  val props = new Properties()
  props.put("bootstrap.servers", "localhost:" + kafkaPort.toString)
  props.put("schema.registry.url", "localhost:" + zooKeeperPort.toString)

  val inputString = "mystring"
  val expectedString = "MYSTRING"
}

class SimpleFlinkKafkaTest extends WordSpec with Matchers with EmbeddedKafka {

  "runs with embedded kafka" should {

    "work" in {

      implicit val config = EmbeddedKafkaConfig(
        kafkaPort = SimpleFlinkKafkaTest.kafkaPort,
        zooKeeperPort = SimpleFlinkKafkaTest.zooKeeperPort
      )

      withRunningKafka {

        publishStringMessageToKafka("input-topic", SimpleFlinkKafkaTest.inputString)

        val env = StreamExecutionEnvironment.getExecutionEnvironment

        env.setParallelism(1)

        val kafkaConsumer = new FlinkKafkaConsumer011(
          "input-topic",
          new SimpleStringSchema,
          SimpleFlinkKafkaTest.props
        )

        implicit val typeInfo = TypeInformation.of(classOf[String])

        val inputStream = env.addSource(kafkaConsumer)

        val outputStream = inputStream.map(_.toUpperCase)

        val kafkaProducer = new FlinkKafkaProducer011(
          "output-topic",
          new SimpleStringSchema(),
          SimpleFlinkKafkaTest.props
        )
        outputStream.addSink(kafkaProducer)
        env.execute()
        consumeFirstStringMessageFrom("output-topic") shouldEqual SimpleFlinkKafkaTest.expectedString

      }
    }
  }
}

I had en error so I add the line implicit val typeInfo = TypeInformation.of(classOf[String]) but I don't really understand why I have to do that.

For now this code doesn't work, it runs without interuption but do not stop and do not give any result.

If someone hase any idea ? Even better idea to test this kind of pipeline.

Thanks !

EDIT : add env.execute() and change error.


Solution

  • Here's a simple solution I came up with.

    The idea is to:

    1. Start Kafka Embedded server
    2. Create your test topics (here input and output)
    3. Launch Flink job in a Future to avoid blocking the main thread
    4. Publish a message to the input topic
    5. Check the result on the output topic

    And the working prototype:

    import java.util.Properties
    
    import org.apache.flink.streaming.api.scala._
    import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.core.fs.FileSystem.WriteMode
    import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
    import org.scalatest.{Matchers, WordSpec}
    
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.Future
    
    class SimpleFlinkKafkaTest extends WordSpec with Matchers with EmbeddedKafka {
    
        "runs with embedded kafka on arbitrary available ports" should {
    
            val env = StreamExecutionEnvironment.getExecutionEnvironment
    
            "work" in {
                val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 9092, zooKeeperPort = 2182)
    
                val properties = new Properties()
                properties.setProperty("bootstrap.servers", "localhost:9092")
                properties.setProperty("zookeeper.connect", "localhost:2182")
                properties.setProperty("group.id", "test")
                properties.setProperty("auto.offset.reset", "earliest")
    
                val kafkaConsumer = new FlinkKafkaConsumer011[String]("input", new SimpleStringSchema(), properties)
                val kafkaSink = new FlinkKafkaProducer011[String]("output", new SimpleStringSchema(), properties)
                val stream = env
                    .addSource(kafkaConsumer)
                    .map(_.toUpperCase)
                    .addSink(kafkaSink)
    
                withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig =>
                    createCustomTopic("input")
                    createCustomTopic("output")
                    Future{env.execute()}
                    publishStringMessageToKafka("input", "Titi")
                    consumeFirstStringMessageFrom("output") shouldEqual "TITI"
                }
            }
        }
    }