`start reading data from kafka
Using Any for unsupported type: typing.Sequence[~T]
Traceback (most recent call last):
File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\examples\python\datastream\main_sample.py", line 104, in <module>
read_from_kafka()
File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\examples\python\datastream\main_sample.py", line 83, in read_from_kafka
env.execute("Kafka Streaming Job")
File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\opt\python\pyflink.zip\pyflink\datastream\stream_execution_environment.py", line 813, in execute
File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\opt\python\py4j-0.10.9.7-src.zip\py4j\java_gateway.py", line 1322, in __call__
File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\opt\python\pyflink.zip\pyflink\util\exceptions.py", line 146, in deco
File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\opt\python\py4j-0.10.9.7-src.zip\py4j\protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o9.execute.
: java.net.MalformedURLException: no protocol: ['file:/C:/flink-1.19.0-bin-scala_2.12/flink-1.19.0/opt/flink-python-1.19.0.jar']
at java.base/java.net.URL.<init>(URL.java:645)
at java.base/java.net.URL.<init>(URL.java:541)
at java.base/java.net.URL.<init>(URL.java:488)
at org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:133)
at org.apache.flink.client.cli.ExecutionConfigAccessor.getJars(ExecutionConfigAccessor.java:77)
at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:77)
at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440)
at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188)
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:117)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
... 14 more
`
i am trying to execute kafka consumer job in flink using pyflink. my code written in python using package pyflink.
how running flink dashboard in window. i did all configuration setup . word count simple python application job executed successfully. But this kafka consume python code only is not executed in flink dashboard.
please help to resolve.
Short Answer: All you need, is Kafka connector for your version of Flink.
Where do you get it? -> Simple, In Maven Central Repo (https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka)
But wait. You are using Flink 1.19 & kafka connector for this version not yet published as on 23-Mar-2024. (https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/)
So, you need to build it using Source Code. And, do you do that? Luckily dependencies are already available.
git clone https://github.com/apache/flink-connector-kafka.git
cd flink-connector-kafka
mvn clean package -Dflink.version=1.19.0 -DskipTests
pip install apache-flink==1.19.0
from pyflink.table import EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
s_env = TableEnvironment.create(env_settings)
# FLINK 1.19.0
DEPS_DIR = "/SOME_PATH/lib-flink-1.19"
s_env.get_config().set(
"pipeline.jars",
f"file:///{DEPS_DIR}/flink-connector-kafka-3.1-SNAPSHOT.jar;file:///{DEPS_DIR}/kafka-clients-3.7.0.jar",
)
s_env.get_config().set(
"pipeline.classpaths",
f"file:///{DEPS_DIR}/flink-connector-kafka-3.1-SNAPSHOT.jar;file:///{DEPS_DIR}/kafka-clients-3.7.0.jar",
)
s_env.get_config().get_configuration().to_dict()
s_env.execute_sql('DROP TABLE IF EXISTS t1')
s_env.execute_sql("""
CREATE TABLE t1 (
symbol STRING,
price FLOAT
ltt timestamp(3),
WATERMARK FOR ltt as ltt - INTERVAL '1' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'test',
'properties.bootstrap.servers' = 'kafka-host:port',
'properties.group.id' = 'MY_TEST_GROUP',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
""")