apache-sparkjava-8spark-streamingtwitter-streaming-api

foreachRDD to pull average number of words & characters for each RDD in J8 Spark Streaming of Twitter API


I'm trying to get the average number of words and characters in each RDD I pull from the Twitter API using spark in Java 8. However, I'm having trouble using streams to achieve this. My code is as follows:

//Create the stream.
JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc);
//Outputs the text of tweets to a JavaDStream.
JavaDStream<String> statuses = twitterStream.map(Status::getText);
//Get the average number of words & characters in each RDD pulled during streaming.
statuses.foreachRDD(rdd -> {
            long c = rdd.count();
            long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
            long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
            long avgWc = wc / c;
            long avgCc = cc / c;
            System.out.println(wc / c);
            System.out.println(cc / c);
        return avgWc, avgCc;});

The error I'm getting is that the return type expected for foreachRDD is void and my return is a long format.

How can I get around this? Is there another way I need to approach this?


Solution

  • A possible solution would be to use JavaDStream.transform. This functions allows to stay within the SparkStreaming-API:

    JavaDStream<String> statuses = ...
    JavaDStream<Tuple2<Long, Long>> avgs = statuses.transform(rdd -> {
                long c = rdd.count();
                long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
                long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
                long avgWc = wc / c;
                long avgCc = cc / c;
                //System.out.println(wc / c);
                //System.out.println(cc / c);
                return jssc.sparkContext().parallelize(Collections.singletonList(Tuple2.apply(avgWc, avgCc)));
            }
    );
    avgs.print();