javaapache-sparkapache-kafkaspark-streamingspark-streaming-kafka

Kafka Spark Streaming LocationStrategies java class def not found exception


I am trying to integrate the kafka message broker and spark and facing an issue saying Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka010/LocationStrategies

Below is the java spark code

package com.test.spell;

import java.util.Arrays;
/**
 * Hello world!
 *
 */
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.spark.api.java.function.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import scala.Tuple2;
public class App 
{
    private static final Pattern SPACE = Pattern.compile(" ");
    public static void main( String[] args )
    {
         String brokers = "localhost:9092";
            String topics = "spark-topic";

            // Create context with a 2 seconds batch interval
            SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
            JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

            Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
            Map<String, Object> kafkaParams = new HashMap<>();
            kafkaParams.put("metadata.broker.list", brokers);

            // Create direct kafka stream with brokers and topics
            JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
                jssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
            System.out.println("In programn");

            // Get the lines, split them into words, count the words and print
            JavaDStream<String> lines = messages.map(new Function<ConsumerRecord<String,String>, String>() {
                @Override
                public String call(ConsumerRecord<String, String> kafkaRecord) throws Exception {
                    return kafkaRecord.value();
                }
            });
            JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String line) throws Exception {
                    System.out.println(line);
                    return Arrays.asList(line.split(" ")).iterator();
                }
            });
          /*  JavaPairDStream<String,Integer> wordCounts = words.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String word) throws Exception {
                    return new Tuple2<>(word,1);
                }
            });*/

//          wordCounts.print();

            // Start the computation
            jssc.start();
            jssc.awaitTermination();
    }
}

Below is my pom.xml I have tried many jar file versions couldn't find the right one.

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.spark-project.spark</groupId>
      <artifactId>unused</artifactId>
      <version>1.0.0</version>
      <scope>provided</scope>
    </dependency>
   <dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.10.3</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2-beta</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>0.9.0-incubating</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.10</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.10</artifactId>
    <version>1.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.3.1</version>
    <scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.0.0</version>
</dependency>
  </dependencies>
</project>

I am running my spark job as follows:

./bin/spark-submit --class  com.test.spell.spark.App \
    --master local \
    --driver-memory 4g \
    --executor-memory 2g \
    --executor-cores 1 \
    --queue default \
 /home/cwadmin/spark_programs/spell/target/spell-0.0.1-SNAPSHOT.jar 

I feel that the above problem is rising due to wrong jar files usage. Can someone help me out in fixing this. I would like to know what are the right jar files that should be used here. Also it would be grateful if someone shares some valuable resources regarding these programs like integration of Spark and Kafka.

I am trying to fix this issue since 2 days and unable to solve this

Thanks in advance.


Solution

    1. First, you need to use the same version of Spark dependencies - I see that you're using 2.1.0 for spark-core, 2.3.1 for spark-streaming, 2.0.0 for spark-streaming-kafka, etc.
    2. Second - you need to use the same version of Scala for these dependencies, and it should to version of Scala that was used to compile your build of Spark.
    3. Third - you don't need to explicitly specify dependencies for Kafka libraries.
    4. You need to build a fat-jar of your application, that will include necessary libraries (except spark-core that should be marked as provided). The easiest way to do this is to use Maven Assembly plugin.