scalaapache-sparkdataframegraphspark-graphx

How to build a graph from a dataframe ? (GraphX)


I'm new to scala and spark and I need to build a graph from a dataframe. this is the structure of my dataframe where S and O are nodes and column P presents edges.

+---------------------------+---------------------+----------------------------+
|S                          |P                    |O                           |
+---------------------------+---------------------+----------------------------+
|http://website/Jimmy_Carter|http://web/name      |James Earl Carter           |
|http://website/Jimmy_Car   |http://web/country   |http://website/United_States|
|http://website/Jimmy_Car   |http://web/birthPlace|http://web/Georgia_(US)     |
+---------------------------+---------------------+----------------------------+

This is the code of the dataframe and I want to create a graph from the dataframe "dfA"

 val test = sc
     .textFile("testfile.ttl")
     .map(_.split(" "))
     .map(p => Triple(Try(p(0).toString()).toOption,
                      Try(p(1).toString()).toOption,
                      Try(p(2).toString()).toOption))
     .toDF()

  val url_regex = """^(?:"|<{1}\s?)(.*)(?:>(?:\s\.)?|,\s.*)$"""
  val dfA = test
      .withColumn("Subject", regexp_extract($"Subject", url_regex, 1))
      .withColumn("Predicate", regexp_extract($"Predicate", url_regex, 1))
      .withColumn("Object", regexp_extract($"Object", url_regex, 1))

Solution

  • To create a GraphX graph, you need to extract the vertices from your dataframe and associate them to IDs. Then, you need to extract the edges (2-tuples of vertices + metadata) using these IDs. And all that needs to be in RDDs, not dataframes.

    In other words, you need a RDD[(VertexId, X)] for vertices, and a RDD[Edge(VertexId, VertexId, Y)] where X is the vertex metadata and Y the edge metadata. Note that VertexId is just an alias for Long.

    In your case, with "S" and "O" the vertex columns and "P" the edge column, it would go as follows.

    // Let's create the vertex RDD.
    val vertices : RDD[(VertexId, String)] = df
        .select(explode(array('S, 'O))) // S and O are the vertices
        .distinct // we remove duplicates
        .rdd.map(_.getAs[String](0)) // transform to RDD
        .zipWithIndex // associate a long index to each vertex
        .map(_.swap)
    
    // Now let's define a vertex dataframe because joins are clearer in sparkSQL
    val vertexDf = vertices.toDF("id", "node")
    
    // And let's extract the edges and join their vertices with their respective IDs
    val edges : RDD[Edge[String]] = df
        .join(vertexDf, df("S") === vertexDf("node")) // getting the IDs for "S"
        .select('P, 'O, 'id as 'idS)
        .join(vertexDf, df("O") === vertexDf("node")) // getting the IDs for "O"
        .rdd.map(row => // creating the edge using column "P" as metadata 
          Edge(row.getAs[Long]("idS"), row.getAs[Long]("id"), row.getAs[String]("P")))
    
    // And finally
    val graph = Graph(vertices, edges)