javaapache-sparkspark-streamingdstream

Sorting a JavaDStream - Spark Streaming


I have an application which works with JavaDStreams objects. This is a piece of code, where I compute the frequencies the words appear with.

JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
      new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(String s) {
          return new Tuple2<>(s, 1);
        }
      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      });

Now, if I wished to print the top N frequent elements, sorted by the Integer value, how can I do this if there's not methods like sortByKey (for JavaPairRDD)?


Solution

  • As you have JavaPairDStream<String, Integer> and want to sort by Integer value, you have to swap pair first.

    JavaPairDStream<Integer,String> swappedPair = wordCounts.mapToPair(x -> x.swap());
    

    Now you can sort by using transformToPair and use sortByKey function.

    JavaPairDStream<Integer,String> sortedStream = swappedPair.transformToPair(
         new Function<JavaPairRDD<Integer,String>, JavaPairRDD<Integer,String>>() {
             @Override
             public JavaPairRDD<Integer,String> call(JavaPairRDD<Integer,String> jPairRDD) throws Exception {
                        return jPairRDD.sortByKey(false);
                      }
                  });
    
    sortedStream.print();