apache-flinkpyflink

apache flink job execution using pyflink error


`start reading data from kafka
Using Any for unsupported type: typing.Sequence[~T]
Traceback (most recent call last):
  File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\examples\python\datastream\main_sample.py", line 104, in <module>
    read_from_kafka()
  File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\examples\python\datastream\main_sample.py", line 83, in read_from_kafka
    env.execute("Kafka Streaming Job")
  File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\opt\python\pyflink.zip\pyflink\datastream\stream_execution_environment.py", line 813, in execute
  File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\opt\python\py4j-0.10.9.7-src.zip\py4j\java_gateway.py", line 1322, in __call__
  File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\opt\python\pyflink.zip\pyflink\util\exceptions.py", line 146, in deco
  File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\opt\python\py4j-0.10.9.7-src.zip\py4j\protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o9.execute.
: java.net.MalformedURLException: no protocol: ['file:/C:/flink-1.19.0-bin-scala_2.12/flink-1.19.0/opt/flink-python-1.19.0.jar']

        at java.base/java.net.URL.<init>(URL.java:645)

        at java.base/java.net.URL.<init>(URL.java:541)

        at java.base/java.net.URL.<init>(URL.java:488)

        at org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:133)

        at org.apache.flink.client.cli.ExecutionConfigAccessor.getJars(ExecutionConfigAccessor.java:77)

        at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:77)

        at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72)

        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440)

        at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188)

        at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:117)

        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.base/java.lang.reflect.Method.invoke(Method.java:566)

        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)

        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)

        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)

        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)

        at java.base/java.lang.Thread.run(Thread.java:834)


org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
        at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
        ... 14 more
`

i am trying to execute kafka consumer job in flink using pyflink. my code written in python using package pyflink.

how running flink dashboard in window. i did all configuration setup . word count simple python application job executed successfully. But this kafka consume python code only is not executed in flink dashboard.

please help to resolve.


Solution

  • Short Answer: All you need, is Kafka connector for your version of Flink.

    Where do you get it? -> Simple, In Maven Central Repo (https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka)

    But wait. You are using Flink 1.19 & kafka connector for this version not yet published as on 23-Mar-2024. (https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/)

    enter image description here

    So, you need to build it using Source Code. And, do you do that? Luckily dependencies are already available. enter image description here

    Now use these following steps:

    1. Make Sure you have Java 11 & Maven (i have maven 3.9.6)
    2. Checkout the repo (https://github.com/apache/flink-connector-kafka)
    3. Create kafka connector JAR for flink 1.19 (Refer to below snippet)
    4. The JAR will be generated inside: flink-connector-kafka/target/flink-connector-kafka-3.1-SNAPSHOT.jar
    git clone https://github.com/apache/flink-connector-kafka.git
    cd flink-connector-kafka
    mvn clean package -Dflink.version=1.19.0 -DskipTests
    

    What to do with this JAR now?

    1. Make sure to have apache-flink==1.19.0 pip install apache-flink==1.19.0
    2. Create a table/streaming environment in pyflink
    from pyflink.table import EnvironmentSettings, TableEnvironment
    
    env_settings = EnvironmentSettings.in_streaming_mode()
    s_env = TableEnvironment.create(env_settings)
    
    # FLINK 1.19.0
    DEPS_DIR = "/SOME_PATH/lib-flink-1.19"
    s_env.get_config().set(
        "pipeline.jars",
        f"file:///{DEPS_DIR}/flink-connector-kafka-3.1-SNAPSHOT.jar;file:///{DEPS_DIR}/kafka-clients-3.7.0.jar",
    )
    s_env.get_config().set(
        "pipeline.classpaths",
        f"file:///{DEPS_DIR}/flink-connector-kafka-3.1-SNAPSHOT.jar;file:///{DEPS_DIR}/kafka-clients-3.7.0.jar",
    )
    
    s_env.get_config().get_configuration().to_dict()
    

    Test if that works

    s_env.execute_sql('DROP TABLE IF EXISTS t1')
    
    s_env.execute_sql("""
        CREATE TABLE t1 (
          symbol STRING,
          price FLOAT
          ltt timestamp(3),
          WATERMARK FOR ltt as ltt - INTERVAL '1' SECOND
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'test',
          'properties.bootstrap.servers' = 'kafka-host:port',
          'properties.group.id' = 'MY_TEST_GROUP',
          'scan.startup.mode' = 'earliest-offset',
          'format' = 'json'
        )
    """)