scalaapache-kafkaapache-flinkscala-shell

Read from Kafka into Flink Scala Shell


I'm attempting to connect to and read from Kafka (2.1) on my local machine, in the scala-shell that comes with Flink (1.7.2).

Here's what I'm doing :

:require flink-connector-kafka_2.11-1.7.1.jar
:require flink-connector-kafka-base_2.11-1.7.1.jar

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
import java.util.Properties

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()

After, the last statement I'm getting the following error :

scala> var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()
<console>:69: error: overloaded method value addSource with alternatives:
  [T](function: org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext[T] => Unit)(implicit evidence$10: org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T] <and>
  [T](function: org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit evidence$9: org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
 cannot be applied to (org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer[String])
   var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()

I have created the topic named "topic" and I'm able to produce and read messages from it, through another client correctly. I'm using java version 1.8.0_201 and following the instructions from https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html .

Any help on what could be going wrong?


Solution

  • Some dependencies need other dependencies, implicitly. We usually use some dependency managers like maven or sbt and when we add some dependencies into the project, the dependency manager will provide its implicit dependencies in the background.

    On the other hand, when you use shells, where there is no dependency manager, you are responsible for providing your code dependencies. Using Flink Kafka connector explicitly needs the Flink Connector Kafka jar, but you should notice that Flink Connector Kafka needs some dependencies, too. You can find it's dependencies at the bottom of the page, which is in the section Compile Dependencies. So starting with this preface, I added the following jar files to the directory FLINK_HOME/lib (Flink classpath):

    flink-connector-kafka-0.11_2.11-1.4.2.jar
    flink-connector-kafka-0.10_2.11-1.4.2.jar    
    flink-connector-kafka-0.9_2.11-1.4.2.jar   
    flink-connector-kafka-base_2.11-1.4.2.jar  
    flink-core-1.4.2.jar                                         
    kafka_2.11-2.1.1.jar
    kafka-clients-2.1.0.jar
    

    and I could successfully consume Kafka messages using the following code in the Flink shell:

    scala> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
    
    scala> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema
    
    scala> import java.util.Properties
    import java.util.Properties
    
    scala> val properties = new Properties()
    properties: java.util.Properties = {}
    
    scala> properties.setProperty("bootstrap.servers", "localhost:9092")
    res0: Object = null
    
    scala> properties.setProperty("group.id", "test")
    res1: Object = null
    
    scala> val stream = senv.addSource(new FlinkKafkaConsumer011[String]("topic", new SimpleStringSchema(), properties)).print()
    warning: there was one deprecation warning; re-run with -deprecation for details
    stream: org.apache.flink.streaming.api.datastream.DataStreamSink[String] = org.apache.flink.streaming.api.datastream.DataStreamSink@71de1091
    
    scala> senv.execute("Kafka Consumer Test")
    Submitting job with JobID: 23e3bb3466d914a2747ae5fed293a076. Waiting for job completion.
    Connected to JobManager at Actor[akka.tcp://flink@localhost:40093/user/jobmanager#1760995711] with leader session id 00000000-0000-0000-0000-000000000000.
    03/11/2019 21:42:39 Job execution switched to status RUNNING.
    03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to SCHEDULED 
    03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to SCHEDULED 
    03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to DEPLOYING 
    03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to DEPLOYING 
    03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to RUNNING 
    03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to RUNNING 
    hello
    hello
    

    In addition, another way to add some jar files to the Flink classpath is to pass the jars as arguments for Flink shell start command:

    bin/start-scala-shell.sh local "--addclasspath <path/to/jar.jar>"
    

    Test environment:

    Flink 1.4.2
    Kafka 2.1.0
    Java  1.8 201
    Scala 2.11