apache-sparkhadoopgraphpysparkgraphframes

GraphFrames: Merge edge nodes with similar column values


tl;dr: How do you simplify a graph, removing edge nodes with identical name values?

I have a graph defined as follows:

import graphframes
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
vertices = spark.createDataFrame([
    ('1', 'foo', '1'),
    ('2', 'bar', '2'),
    ('3', 'bar', '3'),
    ('4', 'bar', '5'),
    ('5', 'baz', '9'),
    ('6', 'blah', '1'),
    ('7', 'blah', '2'),
    ('8', 'blah', '3')
], ['id', 'name', 'value'])

edges = spark.createDataFrame([
    ('1', '2'),
    ('1', '3'),
    ('1', '4'),
    ('1', '5'),
    ('5', '6'),
    ('5', '7'),
    ('5', '8')
], ['src', 'dst'])

f = graphframes.GraphFrame(vertices, edges)

Which produces a graph that looks like this (where the numbers represent the vertex ID): graph

Starting from vertex ID equal to 1, I'd like to simplify the graph. Such that nodes with similar name values are coalesced into a single node. A resulting graph would look something like this:

graph

Notice how we only have one foo (ID 1), one bar (ID 2), one baz (ID 5) and one blah (ID 6). The value of the vertex is irrelevant, and just to show that each vertex is unique.

I attempted to implement a solution, however it is hacky, extremely inefficient and I'm certain there is a better way (I also don't think it works):

f = graphframes.GraphFrame(vertices, edges)

# Get the out degrees for our nodes. Nodes that do not appear in
# this dataframe have zero out degrees.
outs = f.outDegrees

# Merge this with our nodes.
vertices = f.vertices
vertices = f.vertices.join(outs, outs.id == vertices.id, 'left').select(vertices.id, 'name', 'value', 'outDegree')
vertices.show()

# Create a new graph with our out degree nodes.
f = graphframes.GraphFrame(vertices, edges)

# Find paths to all edge vertices from our vertex ID = 1
# Can we make this one operation instead of two??? What if we have more than two hops?
one_hop = f.find('(a)-[e]->(b)').filter('b.outDegree is null').filter('a.id == "1"')
one_hop.show()

two_hop = f.find('(a)-[e1]->(b); (b)-[e2]->(c)').filter('c.outDegree is null').filter('a.id == "1"')
two_hop.show()

# Super ugly, but union the vertices from the `one_hop` and `two_hop` above, and unique
# on the name.
vertices = one_hop.select('a.*').union(one_hop.select('b.*'))
vertices = vertices.union(two_hop.select('a.*').union(two_hop.select('b.*').union(two_hop.select('c.*'))))
vertices = vertices.dropDuplicates(['name'])
vertices.show()

# Do the same for the edges
edges = two_hop.select('e1.*').union(two_hop.select('e2.*')).union(one_hop.select('e.*')).distinct()

# We need to ensure that we have the respective nodes from our edges. We do this  by
# Ensuring the referenced vertex ID is in our `vertices` in both the `src` and the `dst`
# columns - This does NOT seem to work as I'd expect!
edges = edges.join(vertices, vertices.id == edges.src, "left").select("src", "dst")
edges = edges.join(vertices, vertices.id == edges.dst, "left").select("src", "dst")
edges.show()

Is there an easier way to remove nodes (and their corresponding edges) so that edge nodes are uniqued on their name?


Solution

  • Why don't you simply treat the name column as new id?

    import graphframes
    
    vertices = spark.createDataFrame([
        ('1', 'foo', '1'),
        ('2', 'bar', '2'),
        ('3', 'bar', '3'),
        ('4', 'bar', '5'),
        ('5', 'baz', '9'),
        ('6', 'blah', '1'),
        ('7', 'blah', '2'),
        ('8', 'blah', '3')
    ], ['id', 'name', 'value'])
    
    edges = spark.createDataFrame([
        ('1', '2'),
        ('1', '3'),
        ('1', '4'),
        ('1', '5'),
        ('5', '6'),
        ('5', '7'),
        ('5', '8')
    ], ['src', 'dst'])
    
    #create a dataframe with only one column
    new_vertices = vertices.select(vertices.name.alias('id')).distinct()
    
    #replace the src ids with the name column
    new_edges = edges.join(vertices, edges.src == vertices.id, 'left')
    new_edges = new_edges.select(new_edges.dst, new_edges.name.alias('src'))
    
    #replace the dst ids with the name column
    new_edges = new_edges.join(vertices, new_edges.dst == vertices.id, 'left')
    new_edges = new_edges.select(new_edges.src, new_edges.name.alias('dst'))
    
    #drop duplicate edges
    new_edges = new_edges.dropDuplicates(['src', 'dst'])
    
    new_edges.show()
    new_vertices.show()
    
    f = graphframes.GraphFrame(new_vertices, new_edges)
    

    Output:

    +---+----+
    |src| dst|
    +---+----+
    |foo| baz|
    |foo| bar|
    |baz|blah|
    +---+----+
    
    +----+
    |  id|
    +----+
    |blah|
    | bar|
    | foo|
    | baz|
    +----+
    

    graph