I'm trying to get the average number of words and characters in each RDD I pull from the Twitter API using spark in Java 8. However, I'm having trouble using streams to achieve this. My code is as follows:
//Create the stream.
JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc);
//Outputs the text of tweets to a JavaDStream.
JavaDStream<String> statuses = twitterStream.map(Status::getText);
//Get the average number of words & characters in each RDD pulled during streaming.
statuses.foreachRDD(rdd -> {
long c = rdd.count();
long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
long avgWc = wc / c;
long avgCc = cc / c;
System.out.println(wc / c);
System.out.println(cc / c);
return avgWc, avgCc;});
The error I'm getting is that the return type expected for foreachRDD
is void and my return is a long format.
How can I get around this? Is there another way I need to approach this?
A possible solution would be to use JavaDStream.transform. This functions allows to stay within the SparkStreaming-API:
JavaDStream<String> statuses = ...
JavaDStream<Tuple2<Long, Long>> avgs = statuses.transform(rdd -> {
long c = rdd.count();
long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
long avgWc = wc / c;
long avgCc = cc / c;
//System.out.println(wc / c);
//System.out.println(cc / c);
return jssc.sparkContext().parallelize(Collections.singletonList(Tuple2.apply(avgWc, avgCc)));
}
);
avgs.print();