I'm attempting to connect to and read from Kafka (2.1) on my local machine, in the scala-shell that comes with Flink (1.7.2).
Here's what I'm doing :
:require flink-connector-kafka_2.11-1.7.1.jar
:require flink-connector-kafka-base_2.11-1.7.1.jar
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
import java.util.Properties
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()
After, the last statement I'm getting the following error :
scala> var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()
<console>:69: error: overloaded method value addSource with alternatives:
[T](function: org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext[T] => Unit)(implicit evidence$10: org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T] <and>
[T](function: org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit evidence$9: org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
cannot be applied to (org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer[String])
var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()
I have created the topic named "topic" and I'm able to produce and read messages from it, through another client correctly. I'm using java version 1.8.0_201 and following the instructions from https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html .
Any help on what could be going wrong?
Some dependencies need other dependencies, implicitly. We usually use some dependency managers like maven or sbt and when we add some dependencies into the project, the dependency manager will provide its implicit dependencies in the background.
On the other hand, when you use shells, where there is no dependency manager, you are responsible for providing your code dependencies. Using Flink Kafka connector explicitly needs the Flink Connector Kafka
jar, but you should notice that Flink Connector Kafka
needs some dependencies, too. You can find it's dependencies at the bottom of the page, which is in the section Compile Dependencies. So starting with this preface, I added the following jar files to the directory FLINK_HOME/lib
(Flink classpath):
flink-connector-kafka-0.11_2.11-1.4.2.jar
flink-connector-kafka-0.10_2.11-1.4.2.jar
flink-connector-kafka-0.9_2.11-1.4.2.jar
flink-connector-kafka-base_2.11-1.4.2.jar
flink-core-1.4.2.jar
kafka_2.11-2.1.1.jar
kafka-clients-2.1.0.jar
and I could successfully consume Kafka messages using the following code in the Flink shell:
scala> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
scala> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
scala> import java.util.Properties
import java.util.Properties
scala> val properties = new Properties()
properties: java.util.Properties = {}
scala> properties.setProperty("bootstrap.servers", "localhost:9092")
res0: Object = null
scala> properties.setProperty("group.id", "test")
res1: Object = null
scala> val stream = senv.addSource(new FlinkKafkaConsumer011[String]("topic", new SimpleStringSchema(), properties)).print()
warning: there was one deprecation warning; re-run with -deprecation for details
stream: org.apache.flink.streaming.api.datastream.DataStreamSink[String] = org.apache.flink.streaming.api.datastream.DataStreamSink@71de1091
scala> senv.execute("Kafka Consumer Test")
Submitting job with JobID: 23e3bb3466d914a2747ae5fed293a076. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:40093/user/jobmanager#1760995711] with leader session id 00000000-0000-0000-0000-000000000000.
03/11/2019 21:42:39 Job execution switched to status RUNNING.
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to SCHEDULED
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to SCHEDULED
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to DEPLOYING
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to DEPLOYING
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to RUNNING
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to RUNNING
hello
hello
In addition, another way to add some jar files to the Flink classpath is to pass the jars as arguments for Flink shell start command:
bin/start-scala-shell.sh local "--addclasspath <path/to/jar.jar>"
Test environment:
Flink 1.4.2
Kafka 2.1.0
Java 1.8 201
Scala 2.11