I have a DataFrame in the following format:
character | title |
---|---|
Tony Stark | ["Iron Man"] |
James Buchanan Barnes | ["Captain America: The First Avenger","Captain America: The Winter Soldier","Captain America: Civil War","Avengers: Infinity War"] |
Marcus Bledsoe | ["Captain America: The Winter Soldier"] |
My goal is to create a GraphX representation where my vertices are Character
and Title
with the edges representing when a character has appeared in a movie. This is a sample data set and the real data will be much larger so this solution must be scalable across multiple executors.
I'm new to Scala and Spark. My strategy has been to create a characterVerticesRDD
, movieVerticesRDD
, and then combine them together.
I believe this is a correct way to build the characterVerticesRDD
:
val characterVerticesRDD: RDD[(VertexId, String)] = df.rdd.map(row => (MurmurHash3.stringHash(row.getString(0)), row.getString(0)))
The following is my first naive attempt. I realize now that using a Set
is invalid since it can't be shared across executors and using collect
is not going to work either in a scalable solution.
val movieVertices = scala.collection.mutable.Set[(Long, String)]()
df.rdd.collect.foreach(row => {
row.getAs[EmbeddedList]("title").elements.map { case d: String => d }.toList.foreach(movie => movieVertices += ((MurmurHash3.stringHash(movie), movie)))
})
val movieVerticesRDD: RDD[(VertexId, String)] = sc.parallelize(movieVertices.toList)
// combine vertices
val verticesRDD: RDD[(VertexId, String)] = characterVerticesRDD ++ movieVerticesRDD
What is the best way to build this movieVerticesRDD
given my DataFrame structure? I somehow need to iterate through the movie titles to create the vertices. I assume the strategy would be similar when creating edges since I'll need to iterate through each row of the data frame to create the edge between character and movie(s).
Thanks for any guidance.
This should do the trick. Basically, we first create a RDD of distinct vertices with ids (we use zipWithIndex
to generate them). Then, we create a dataframe of edges (tuples of vertices) and we join the previously created ids. Finally, we transform the dataframe into a RDD and create the graph with the two RDDs we created.
// your data
val df = Seq(
"Tony Stark" -> Seq("Iron Man"),
"James Buchanan Barnes" -> Seq("Captain America: The First Avenger","Captain America: The Winter Soldier","Captain America: Civil War","Avengers: Infinity War"),
"Marcus Bledsoe" -> Seq("Captain America: The Winter Soldier")
).toDF("character", "title")
// Movies and characters are vertices, creating a RDD of vertices and adding indices
val vertices = df
.select(explode(concat(array('character), 'title)) as "x")
.distinct.rdd.map(_.getAs[String](0))
.zipWithIndex.map(_.swap)
// Dataframe of vertices (same as above)
val vertexDf = vertices.toDF("id", "node")
// Dataframe of edges.
val edgeDF = df
.select('character, explode('title) as "title")
// RDD of edges. We need to join the vertex ids that we previously created.
val edges = edgeDF
.join(vertexDf, edgeDF("character") === vertexDf("node"))
.select('title, 'id as "character_id")
.join(vertexDf, edgeDF("title") === vertexDf("node"))
.rdd
.map(row => Edge(row.getAs[Long]("character_id"), row.getAs[Long]("id"), None))
// And creating the graph
val graph = Graph(vertices, edges)