scalaapache-sparkspark-graphx

Convert a Spark DataFrame containing an embedded list into an RDD in Scala


I have a DataFrame in the following format:

character title
Tony Stark ["Iron Man"]
James Buchanan Barnes ["Captain America: The First Avenger","Captain America: The Winter Soldier","Captain America: Civil War","Avengers: Infinity War"]
Marcus Bledsoe ["Captain America: The Winter Soldier"]

My goal is to create a GraphX representation where my vertices are Character and Title with the edges representing when a character has appeared in a movie. This is a sample data set and the real data will be much larger so this solution must be scalable across multiple executors.

I'm new to Scala and Spark. My strategy has been to create a characterVerticesRDD, movieVerticesRDD, and then combine them together.

I believe this is a correct way to build the characterVerticesRDD:

val characterVerticesRDD: RDD[(VertexId, String)] = df.rdd.map(row => (MurmurHash3.stringHash(row.getString(0)), row.getString(0)))

The following is my first naive attempt. I realize now that using a Set is invalid since it can't be shared across executors and using collect is not going to work either in a scalable solution.

val movieVertices = scala.collection.mutable.Set[(Long, String)]()
df.rdd.collect.foreach(row => {
    row.getAs[EmbeddedList]("title").elements.map { case d: String => d }.toList.foreach(movie => movieVertices += ((MurmurHash3.stringHash(movie), movie)))
})
val movieVerticesRDD: RDD[(VertexId, String)] = sc.parallelize(movieVertices.toList)

// combine vertices
val verticesRDD: RDD[(VertexId, String)] = characterVerticesRDD ++ movieVerticesRDD

What is the best way to build this movieVerticesRDD given my DataFrame structure? I somehow need to iterate through the movie titles to create the vertices. I assume the strategy would be similar when creating edges since I'll need to iterate through each row of the data frame to create the edge between character and movie(s).

Thanks for any guidance.


Solution

  • This should do the trick. Basically, we first create a RDD of distinct vertices with ids (we use zipWithIndex to generate them). Then, we create a dataframe of edges (tuples of vertices) and we join the previously created ids. Finally, we transform the dataframe into a RDD and create the graph with the two RDDs we created.

    // your data
    val df = Seq(
        "Tony Stark" -> Seq("Iron Man"),
        "James Buchanan Barnes" -> Seq("Captain America: The First Avenger","Captain America: The Winter Soldier","Captain America: Civil War","Avengers: Infinity War"),
        "Marcus Bledsoe" -> Seq("Captain America: The Winter Soldier")
    ).toDF("character", "title")
    
    // Movies and characters are vertices, creating a RDD of vertices and adding indices
    val vertices = df
        .select(explode(concat(array('character), 'title)) as "x")
        .distinct.rdd.map(_.getAs[String](0))
        .zipWithIndex.map(_.swap)
    // Dataframe of vertices (same as above)
    val vertexDf = vertices.toDF("id", "node")
    // Dataframe of edges.
    val edgeDF = df
        .select('character, explode('title) as "title")
    // RDD of edges. We need to join the vertex ids that we previously created.
    val edges = edgeDF
        .join(vertexDf, edgeDF("character") === vertexDf("node"))
        .select('title, 'id as "character_id")
        .join(vertexDf, edgeDF("title") === vertexDf("node"))
        .rdd
        .map(row => Edge(row.getAs[Long]("character_id"), row.getAs[Long]("id"), None))
    // And creating the graph
    val graph = Graph(vertices, edges)