javaapache-flinkflink-streaming

NoClassDefFoundError when running flink with kafka connector


I am trying to stream data from kafka using flink. My code compiles without error but on running I get the following error:

Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError: 
    org/apache/flink/streaming/util/serialization/DeserializationSchema
    at java.lang.Class.getDeclaredMethods0(Native Method)
    at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
    at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
    at java.lang.Class.getMethod0(Class.java:3018)
    at java.lang.Class.getMethod(Class.java:1784)
    at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
    at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.util.serialization.DeserializationSchema
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 7 more  

My POM dependency list is as follows:

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-core</artifactId>
            <version>0.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>0.10.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.9_2.11</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>com.googlecode.json-simple</groupId>
            <artifactId>json-simple</artifactId>
            <version>1.1</version>
        </dependency>  
    </dependencies>

The java code that I am trying to run just subscribes to a kafka topic called 'streamer':

import java.util.Properties;
import java.util.Arrays;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;

public class StreamConsumer {
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "samplegroup");

        DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<String>("streamer", new SimpleStringSchema(), properties));

        messageStream.rebalance().map(new MapFunction<String, String>() {
                        private static final long serialVersionUID = -6867736771747690202L;
                        @Override
                        public String map(String value) throws Exception {
                                return "Streamed data: " + value;
                        }
                }).print();
        env.execute();
}
}

System information:
1. Kafka version: 0.9.0.1
2. Flink version: 1.3.2
3. OpenJDK version: 1.8

Although I am using maven, I do not think this is any maven issue because I get the same error even when I try without maven. I manually downloaded all the necessary .jar files to a folder and specified that folder path with the -cp option while compiling with javac. I get the same error as above during runtime but no errors during compile time.


Solution

  • I figured out the reason and it seems like a really silly error now. In my case the jar packages weren't available at run time. I ended up not using maven at all. I compiled with javac -cp <path_to_jar_files> and executed again with java -cp <path_to_jar_files>