apache-sparkpysparkgraphgraphframes

using a modules method in Pyspark map


I have heard that it is available to call a method of another module in python to bring some calculations that is not implemented in spark and of course it is inefficient to do that. I need a method to compute eigenvector centrality of a graph (as it is not available in graphframes module) . I am aware that there is a way to do that in Scala using sparkling graph, but I need python to include everything. I am newbie to spark RDD and I am wondering what is the wrong with the code below or even if this is a proper way of doing this

import networkx as nx

def func1(dt):
    G = nx.Graph()
    src = dt.Provider
    dest = dt.AttendingPhysician
    gr = src.zip(dest)
    G = nx.from_edgelist(gr)
    deg =nx.eigenvector_centrality(G)
    return deg

rdd2=inpatient.rdd.map(lambda x: func1(x))'
rdd2.collect()

inpatient is a dataframe read from a CSV file which I am looking forward to make a graph that is directed from nodes in column Provider to nodes in column AttendingPhysician

there is an error that I am encountered with which is:

AttributeError: 'str' object has no attribute 'zip' 

Solution

  • What you need is to understand the so called user defined functions functionality. However plain python UDFs are not very efficient.

    You can use UDFs efficiently via the provided pandas_udf GROUPED_MAP functionality.

    from the documentation:

    Grouped map Pandas UDFs first splits a Spark DataFrame into groups based on the conditions specified in the groupby operator, applies a user-defined function (pandas.DataFrame -> pandas.DataFrame) to each group, combines and returns the results as a new Spark DataFrame.

    An example for networkx eigenvector centrality running on pyspark is below. I group per cluster_id which is the result from graphframes connected components function:

    def eigencentrality(
        sparkdf, src="src", dst="dst", cluster_id_colname="cluster_id",
    ):
    
        """
        Args:
            sparkdf: imput edgelist Spark DataFrame
            src: src column name
            dst: dst column name
            distance_colname: distance column name
            cluster_id_colname: Graphframes-created connected components created cluster_id
        Returns:
            node_id:
            eigen_centrality: eigenvector centrality of cluster cluster_id
            cluster_id: cluster_id corresponding to the node_id
    
        """
        ecschema = StructType(
            [
                StructField("node_id", StringType()),
                StructField("eigen_centrality", DoubleType()),
                StructField(cluster_id_colname, LongType()),
            ]
        )
    
        psrc = src
        pdst = dst
    
        @pandas_udf(ecschema, PandasUDFType.GROUPED_MAP)
        def eigenc(pdf: pd.DataFrame) -> pd.DataFrame:
            nxGraph = nx.Graph()
            nxGraph = nx.from_pandas_edgelist(pdf, psrc, pdst)
            ec = eigenvector_centrality(nxGraph, tol=1e-03)
            out_df = (
                pd.DataFrame.from_dict(ec, orient="index", columns=["eigen_centrality"])
                .reset_index()
                .rename(
                    columns={"index": "node_id", "eigen_centrality": "eigen_centrality"}
                )
            )
    
            cluster_id = pdf[cluster_id_colname][0]
            out_df[cluster_id_colname] = cluster_id
            return out_df
    
        out = sparkdf.groupby(cluster_id_colname).apply(eigenc)
        return out
    

    NB. I have created the splink_graph package in order to run parallelised networkx graph operations like the above on Pyspark efficiently. This is where this code comes from. If you are interested have a look there to see how other graph metrics have been implemented...