
How to solve "avro.AvroRowDeserializationSchema does not exist in the JVM" when trying to get data using pyflink

Pyflink getting data from kafka producer. When producer sends data in json-fomat, JsonRowDeserializationSchema works fine, but when I'm sending data in avro format, AvroRowDeserializationSchema falls with next exception:

Exception in thread "Thread-4" java.lang.NoClassDefFoundError: org/apache/avro/io/DatumReader
        at java.base/java.lang.Class.forName0(Native Method)
        at java.base/java.lang.Class.forName(Unknown Source)
        at org.apache.flink.api.python.shaded.py4j.reflection.CurrentThreadClassLoadingStrategy.classForName(
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionUtil.classForName(
        at org.apache.flink.api.python.shaded.py4j.reflection.TypeUtil.forName(
        at org.apache.flink.api.python.shaded.py4j.commands.ReflectionCommand.getUnknownMember(
        at org.apache.flink.api.python.shaded.py4j.commands.ReflectionCommand.execute(
        at java.base/ Source)
Caused by: java.lang.ClassNotFoundException:
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source)
        at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source)
        at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
        ... 9 more
Exception while sending command.
Traceback (most recent call last):
  File "/opt/flink/opt/python/", line 1224, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/flink/opt/python/", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/flink/opt/python/", line 1229, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
Traceback (most recent call last):
  File "/opt/examples/", line 215, in <module>
  File "/opt/examples/", line 187, in load_data_to_pg
    source = self._read_from_kafka()
  File "/opt/examples/", line 157, in _read_from_kafka
    deserialization_schema = self._get_serialization_schema()
  File "/opt/examples/", line 146, in _get_serialization_schema
  File "/opt/flink/opt/python/", line 206, in __init__
  File "/opt/flink/opt/python/", line 1661, in __getattr__
py4j.protocol.Py4JError: org.apache.flink.formats.avro.AvroRowDeserializationSchema does not exist in the JVM
org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(
        at org.apache.flink.client.ClientUtils.executeProgram(
        at org.apache.flink.client.cli.CliFrontend.executeProgram(
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(
        at org.apache.flink.client.cli.CliFrontend.main(
Caused by: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(
        ... 13 more

Here is the part of consumer code, where AvroRowDeserializationSchema used:

    def _get_serialization_schema(self):

        data_transfer_format = self.config['source']['kafka']["data_tranfer_format"]

        if data_transfer_format == "json":
            json_schema = str(self._create_json_schema())

            json_deserialization_schema = JsonRowDeserializationSchema \
                .builder() \
                  "type": "object",
                  "properties": {json_schema}

            return json_deserialization_schema

        elif data_transfer_format == "avro":

            avro_schema = str(self._create_avro_schema())

            avro_deserialization_schema = AvroRowDeserializationSchema(
                        "name": "kafka_message",
                        "type": "record",
                        "fields": [

            return avro_deserialization_schema

Here is Dockerfile, where I'm downloading several libraries (.jars):

FROM apache/flink:1.16.2-scala_2.12-java11
# Install python3.7 and pyflink
# Pyflink does not yet function with python3.9, and this image is build on
# debian bullseye which ships with that version, so build python3.7 here.
RUN set -ex; \
  apt-get update && \
  apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev lzma liblzma-dev && \
  wget && \
  tar -xvf Python-3.7.9.tgz && \
  cd Python-3.7.9 && \
  ./configure --without-tests --enable-shared && \
  make -j4 && \
  make install && \
  ldconfig /usr/local/lib && \
  cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \
  ln -s /usr/local/bin/python3 /usr/local/bin/python && \
  ln -s /usr/local/bin/pip3 /usr/local/bin/pip && \
  apt-get clean && \
  rm -rf /var/lib/apt/lists/* && \
  apt -y install python3-pip && \
  apt-get install -y openjdk-8-jdk && \
  apt-get install -y ant && \
  python -m pip install --upgrade pip; \
  pip install apache-flink==${FLINK_VERSION}; \
  pip install kafka-python; \
  pip install py4j;

# Download connector libraries
RUN wget -P /opt/flink/lib/${FLINK_VERSION}/flink-json-${FLINK_VERSION}.jar; \
    wget -P /opt/flink/lib/${FLINK_VERSION}/flink-csv-${FLINK_VERSION}.jar; \
    wget -P /opt/flink/lib/${FLINK_VERSION}/flink-avro-${FLINK_VERSION}.jar; \
    wget -P /opt/flink/lib/${FLINK_VERSION}/flink-sql-avro-${FLINK_VERSION}.jar; \
    wget -P /opt/flink/lib/${FLINK_VERSION}/flink-avro-confluent-registry-${FLINK_VERSION}.jar; \
    wget -P /opt/flink/lib/${FLINK_VERSION}/flink-connector-jdbc-${FLINK_VERSION}.jar;

RUN echo "taskmanager.memory.jvm-metaspace.size: 512m" >> /opt/flink/conf/flink-conf.yaml;
WORKDIR /opt/flink

And also I've added several .jars right in consumer code:

AVRO_JAR_PATH = f"file://{current_directory}/avro-1.3.3.jar"
FLINK_AVRO_JAR_PATH = f"file://{current_directory}/flink-avro-1.17.1.jar"
env = StreamExecutionEnvironment.get_execution_environment()

I think, that there's no mistakes in code and I'm missing something to install or add to Dockerfile or code. May be versions of libraries? Also, I tried this:

            JSchemaParser = get_gateway()
            avro_deserialization_schema = JSchemaParser().parse(f"""
                        "name": "kafka_message",
                        "type": "record",
                        "fields": [

and getting the same exception:

py4j.protocol.Py4JError: org.apache.avro.Schema does not exist in the JVM

Tell, if it's necessary to add some code to my question or something else. Consumer code works in docker with kafka and flink.


  • I've solved the problem! I've just changed the Docker image from apache/flink:1.16.2-scala_2.12-java11 to apache/flink:1.16.2-scala_2.12-java8. It works fine now without any problems.