javaapache-sparkdatasetspark-graphx

Convert a JavaRDD<Tuple2<Object, long[]>> into a Spark Dataset<Row> in Java


In Java (not Scala!) Spark 3.0.1 have a JavaRDD instance object neighborIdsRDD which its type is JavaRDD<Tuple2<Object, long[]>>.

Part of my code related to the generation of the JavaRDD is the following:

GraphOps<String, String> graphOps = new GraphOps<>(graph, stringTag, stringTag);
JavaRDD<Tuple2<Object, long[]>> neighborIdsRDD = graphOps.collectNeighborIds(EdgeDirection.Either()).toJavaRDD();

I have had to get a JavaRDD using toJavaRDD() because collectNeighborIds returns a org.apache.spark.graphx.VertexRDD<long[]> object (VertexRDD doc).

However, in the rest of my application I need to have a Spark Dataset<Row> built from the collectNeighborIds object.

What are the possibilities and the best ways to get a JavaRDD<Tuple2<Object, long[]>> be converted into a Dataset<Row>?


Update from comments:

I adjusted the code basing from comments:

        GraphOps<String, String> graphOps = new GraphOps<>(graph, stringTag, stringTag);
        JavaRDD<Tuple2<Object, long[]>> neighborIdsRDD = graphOps.collectNeighborIds(EdgeDirection.Either()).toJavaRDD();
        System.out.println("VertexRDD neighborIdsRDD is:");
        for (int i = 0; i < neighborIdsRDD.collect().size(); i++) {
            System.out.println(
                    ((Tuple2<Object, long[]>) neighborIdsRDD.collect().get(i))._1() + " -- " +
                            Arrays.toString(((Tuple2<Object, long[]>) neighborIdsRDD.collect().get(i))._2())
            );
        }

        Dataset<Row> dr = spark_session.createDataFrame(neighborIdsRDD.rdd(), Tuple2.class);
        System.out.println("converted Dataset<Row> is:");
        dr.show();

but I get an empty Dataset as follows:

VertexRDD neighborIdsRDD is:
4 -- [3]
1 -- [2, 3]
5 -- [3, 2]
2 -- [1, 3, 5]
3 -- [1, 2, 5, 4]
converted Dataset<Row> is:
++
||
++
||
||
||
||
||
++

Solution

  • I was in your same situation, but fortunately I found a solution to get back a Dataframe.

    Solution code is commented at steps [1], [2] and [3].

    GraphOps<String, String> graphOps = new GraphOps<>(graph, stringTag, stringTag);
    System.out.println("VertexRDD neighborIdsRDD is:");
    JavaRDD<Tuple2<Object, long[]>> neighborIdsRDD = graphOps.collectNeighborIds(EdgeDirection.Either()).toJavaRDD();
    for (int i = 0; i < neighborIdsRDD.collect().size(); i++) {
        System.out.println(
                ((Tuple2<Object, long[]>) neighborIdsRDD.collect().get(i))._1() + " -- " +
                        Arrays.toString(((Tuple2<Object, long[]>) neighborIdsRDD.collect().get(i))._2())
        );
    }
    
    // [1] Define encoding schema
    StructType graphStruct =  new StructType(new StructField[]{
            new StructField("father", DataTypes.LongType, false, Metadata.empty()),
            new StructField("children", DataTypes.createArrayType(DataTypes.LongType), false, Metadata.empty()),
    });
    
    // [2] Build a JavaRDD<Row> from a JavaRDD<Tuple2<Object,long[]>>
    JavaRDD<Row> dr = neighborIdsRDD.map(tupla -> RowFactory.create(tupla._1(), tupla._2()));
            
    // [3] Finally build the reqired Dataframe<Row>
    Dataset<Row> dsr = spark_session.createDataFrame(dr.rdd(), graphStruct);
    
    System.out.println("DATASET IS:");
    dsr.show();
    

    Printed output:

    VertexRDD neighborIdsRDD is:
    4 -- [3]
    1 -- [2, 3]
    5 -- [3, 2]
    2 -- [1, 3, 5]
    3 -- [1, 2, 5, 4]
    DATASET IS:
    +------+------------+
    |father|    children|
    +------+------------+
    |     4|         [3]|
    |     1|      [2, 3]|
    |     5|      [3, 2]|
    |     2|   [1, 3, 5]|
    |     3|[1, 2, 5, 4]|
    +------+------------+