Pyflink getting data from kafka producer. When producer sends data in json-fomat, JsonRowDeserializationSchema works fine, but when I'm sending data in avro format, AvroRowDeserializationSchema falls with next exception:
Exception in thread "Thread-4" java.lang.NoClassDefFoundError: org/apache/avro/io/DatumReader
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Unknown Source)
at org.apache.flink.api.python.shaded.py4j.reflection.CurrentThreadClassLoadingStrategy.classForName(CurrentThreadClassLoadingStrategy.java:40)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionUtil.classForName(ReflectionUtil.java:51)
at org.apache.flink.api.python.shaded.py4j.reflection.TypeUtil.forName(TypeUtil.java:243)
at org.apache.flink.api.python.shaded.py4j.commands.ReflectionCommand.getUnknownMember(ReflectionCommand.java:175)
at org.apache.flink.api.python.shaded.py4j.commands.ReflectionCommand.execute(ReflectionCommand.java:87)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException: org.apache.avro.io.DatumReader
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
... 9 more
Exception while sending command.
Traceback (most recent call last):
File "/opt/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1224, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1038, in send_command
response = connection.send_command(command)
File "/opt/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1229, in send_command
"Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
Traceback (most recent call last):
File "/opt/examples/app.py", line 215, in <module>
flink_consumer.load_data_to_pg()
File "/opt/examples/app.py", line 187, in load_data_to_pg
source = self._read_from_kafka()
File "/opt/examples/app.py", line 157, in _read_from_kafka
deserialization_schema = self._get_serialization_schema()
File "/opt/examples/app.py", line 146, in _get_serialization_schema
}}"""
File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/formats/avro.py", line 206, in __init__
File "/opt/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1661, in __getattr__
py4j.protocol.Py4JError: org.apache.flink.formats.avro.AvroRowDeserializationSchema does not exist in the JVM
org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:843)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:240)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1087)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1165)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1165)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
... 13 more
Here is the part of consumer code, where AvroRowDeserializationSchema used:
def _get_serialization_schema(self):
data_transfer_format = self.config['source']['kafka']["data_tranfer_format"]
if data_transfer_format == "json":
json_schema = str(self._create_json_schema())
json_deserialization_schema = JsonRowDeserializationSchema \
.builder() \
.json_schema(json_schema=
f'''{{
"type": "object",
"properties": {json_schema}
}}''').build()
return json_deserialization_schema
elif data_transfer_format == "avro":
avro_schema = str(self._create_avro_schema())
avro_deserialization_schema = AvroRowDeserializationSchema(
avro_schema_string=f"""
{{
"name": "kafka_message",
"type": "record",
"fields": [
{avro_schema}
]
}}"""
)
return avro_deserialization_schema
Here is Dockerfile, where I'm downloading several libraries (.jars):
FROM apache/flink:1.16.2-scala_2.12-java11
ARG FLINK_VERSION=1.16.2
# Install python3.7 and pyflink
# Pyflink does not yet function with python3.9, and this image is build on
# debian bullseye which ships with that version, so build python3.7 here.
RUN set -ex; \
apt-get update && \
apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev lzma liblzma-dev && \
wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \
tar -xvf Python-3.7.9.tgz && \
cd Python-3.7.9 && \
./configure --without-tests --enable-shared && \
make -j4 && \
make install && \
ldconfig /usr/local/lib && \
cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \
ln -s /usr/local/bin/python3 /usr/local/bin/python && \
ln -s /usr/local/bin/pip3 /usr/local/bin/pip && \
apt-get clean && \
rm -rf /var/lib/apt/lists/* && \
apt -y install python3-pip && \
apt-get install -y openjdk-8-jdk && \
apt-get install -y ant && \
python -m pip install --upgrade pip; \
pip install apache-flink==${FLINK_VERSION}; \
pip install kafka-python; \
pip install py4j;
# Download connector libraries
RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/${FLINK_VERSION}/flink-json-${FLINK_VERSION}.jar; \
wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/${FLINK_VERSION}/flink-csv-${FLINK_VERSION}.jar; \
wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/${FLINK_VERSION}/flink-avro-${FLINK_VERSION}.jar; \
wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-avro/${FLINK_VERSION}/flink-sql-avro-${FLINK_VERSION}.jar; \
wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro-confluent-registry/${FLINK_VERSION}/flink-avro-confluent-registry-${FLINK_VERSION}.jar; \
wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/${FLINK_VERSION}/flink-connector-jdbc-${FLINK_VERSION}.jar;
RUN echo "taskmanager.memory.jvm-metaspace.size: 512m" >> /opt/flink/conf/flink-conf.yaml;
WORKDIR /opt/flink
And also I've added several .jars right in consumer code:
AVRO_JAR_PATH = f"file://{current_directory}/avro-1.3.3.jar"
FLINK_AVRO_JAR_PATH = f"file://{current_directory}/flink-avro-1.17.1.jar"
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars(AVRO_JAR_PATH, FLINK_AVRO_JAR_PATH)
I think, that there's no mistakes in code and I'm missing something to install or add to Dockerfile or code. May be versions of libraries? Also, I tried this:
JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser
avro_deserialization_schema = JSchemaParser().parse(f"""
{{
"name": "kafka_message",
"type": "record",
"fields": [
{avro_schema}
]
}}""")
and getting the same exception:
py4j.protocol.Py4JError: org.apache.avro.Schema does not exist in the JVM
Tell, if it's necessary to add some code to my question or something else. Consumer code works in docker with kafka and flink.
I've solved the problem! I've just changed the Docker image from apache/flink:1.16.2-scala_2.12-java11
to apache/flink:1.16.2-scala_2.12-java8
. It works fine now without any problems.