apache-sparkapache-spark-sqlspark-graphx

apache spark graphx - create VertexRDD from sql table


I have table which I'm loading to Dataframe in Spark, it has the following schema:

verticesDf.printSchema

root
 |-- id: integer (nullable = true)
 |-- target: string (nullable = true)
 |-- batch_id: integer (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)
 |-- size: double (nullable = true)

How can I transform it to VertexRDD so later I can build a Graph with it?

I was trying the following:

case class SRow( target:String, batch_id:Double, x:Double, y:Double, z:Double, size:Double)
val sourceDS: Dataset[(VertexId, SRow)] = verticesDf.as[(VertexId, SRow)]
val vertVX=VertexRDD(sourceDS)

but this and many others doesn't give result - I'm always getting some type mismatch. What is the proper way?


Solution

  • At the very least, to create a graph you need two RDDs. One of type RDD[(VertexId, VD)] that contains the vertices. A VertexId is nothing more than a Long and VD can be anything, your Srow class for instance. The other RDD is of type RDD[Edge[ED]], where ED similarly to VD can be anything.

    Here you talk about the creation of the vextex RDD. You are trying to convert your dataframe to a dataset of type Dataset[(VertexId, SRow)]. It does not work for two reasons. id is an integer and not a long, and the structure is wrong. Your dataframe contains more than two columns.

    Here is how to do it:

    val vertices = verticesDf
        .select(
           // we transform the id to a long
           'id cast "long",
           // we create a struct with the other columns that will be turned into a Srow
           struct(verticesDf.columns.tail.map(col) : _*))
        .as[(Long, SRow)]
    
    // we also need edges, let's create a dummy RDD
    val edges = sc.parallelize(Seq(Edge(1L, 2L, "test")))
    
    // And voila
    val graph: Graph[SRow,String] = Graph(vertices.rdd, edges)
    

    Note on the last line that a graph is created from RDDs, not datasets so we need to make the transformation for the vertices.