javaapache-sparkpysparkapache-kafkaspark-kafka-integration

PySpark - NoClassDefFoundError: kafka/common/TopicAndPartition


I'm running Spark version 2.3.0.2.6.5.1175-1 with Python 3. 6.8 on Ambari. While submitting the application I get the following logs in stderr

22/06/15 12:29:31 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint Exception in thread "Thread-10" java.lang.NoClassDefFoundError: kafka/common/TopicAndPartition at java.lang.Class.getDeclaredMethods0(Native Method) at java.lang.Class.privateGetDeclaredMethods(Class.java:2701) at java.lang.Class.privateGetPublicMethods(Class.java:2902) at java.lang.Class.getMethods(Class.java:1615) at py4j.reflection.ReflectionEngine.getMethodsByNameAndLength(ReflectionEngine.java:345) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:305) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:274) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: kafka.common.TopicAndPartition at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 12 more 22/06/15 12:29:33 ERROR ApplicationMaster: User application exited with status 1

Following are the stdout logs

    ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1062, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 908, in send_command
    response = connection.send_command(command)
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1067, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
Traceback (most recent call last):
  File "read_stream.py", line 13, in <module>
    stream.start(callback=callback)
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/__pyfiles__/OwlsenseStream.py", line 125, in start
    output = self.readStream().foreachRDD(lambda rdd: self.process(rdd, callback))
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/__pyfiles__/OwlsenseStream.py", line 70, in readStream
    messageHandler=lambda x: (x.topic, x.message))
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/pyspark.zip/pyspark/streaming/kafka.py", line 150, in createDirectStream
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1655212331279_0010/container_e170_1655212331279_0010_01_000001/py4j-0.10.6-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o93.createDirectStreamWithMessageHandler

I've providing the following jar files

spark-sql-kafka-0-10_2.11-2.3.0.jar

spark-streaming-kafka-0-8_2.11-2.3.0.jar

metrics-core-2.2.0.jar

Is this some kind of configuration issue or is there something wrong in the code?

Edit: I am using livy to submit the job to the cluster. Below is the POST request code

    headers = {
    'X-Requested-By': 'admin',
    'Content-Type': 'application/json',
}

data = {
        "numExecutors": stream['executors'],
        "executorCores": stream['cores'],
        "executorMemory": stream['memory'],
        "driverMemory": "2g",
        "file": stream['path'],
        "queue": "default",
        "pyFiles": [
            "hdfs:///user/bds/elastic_search/es_pyspark_handler.py",
            "hdfs:///user/bds/realtime/OwlsenseStream.py"
        ],
        "conf": {
            "spark.jars.packages": "org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.yammer.metrics:metrics-core:2.2.0"
        },
        "name": stream['name']
    }

    data = json.dumps(data)

    response = requests.post(url=f"http://{IP_ADDRESS}:8998/batches", headers=headers, data=data,
                             verify=False)

Solution

  • I was unable to run the code on these versions probably there was something wrong with the jars versions. I changed the versions to the following:

    I used the following packages:

    --packages com.yammer.metrics:metrics-core:2.2.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2