I have heard that it is available to call a method of another module in python to bring some calculations that is not implemented in spark and of course it is inefficient to do that. I need a method to compute eigenvector centrality of a graph (as it is not available in graphframes module) . I am aware that there is a way to do that in Scala using sparkling graph, but I need python to include everything. I am newbie to spark RDD and I am wondering what is the wrong with the code below or even if this is a proper way of doing this
import networkx as nx
def func1(dt):
G = nx.Graph()
src = dt.Provider
dest = dt.AttendingPhysician
gr = src.zip(dest)
G = nx.from_edgelist(gr)
deg =nx.eigenvector_centrality(G)
return deg
rdd2=inpatient.rdd.map(lambda x: func1(x))'
rdd2.collect()
inpatient is a dataframe read from a CSV file which I am looking forward to make a graph that is directed from nodes in column Provider to nodes in column AttendingPhysician
there is an error that I am encountered with which is:
AttributeError: 'str' object has no attribute 'zip'
What you need is to understand the so called user defined functions
functionality. However plain python UDFs are not very efficient.
You can use UDFs efficiently via the provided pandas_udf GROUPED_MAP functionality.
from the documentation:
Grouped map Pandas UDFs first splits a Spark DataFrame into groups based on the conditions specified in the groupby operator, applies a user-defined function (pandas.DataFrame -> pandas.DataFrame) to each group, combines and returns the results as a new Spark DataFrame.
An example for networkx eigenvector centrality running on pyspark is below. I group per cluster_id which is the result from graphframes
connected components function:
def eigencentrality(
sparkdf, src="src", dst="dst", cluster_id_colname="cluster_id",
):
"""
Args:
sparkdf: imput edgelist Spark DataFrame
src: src column name
dst: dst column name
distance_colname: distance column name
cluster_id_colname: Graphframes-created connected components created cluster_id
Returns:
node_id:
eigen_centrality: eigenvector centrality of cluster cluster_id
cluster_id: cluster_id corresponding to the node_id
"""
ecschema = StructType(
[
StructField("node_id", StringType()),
StructField("eigen_centrality", DoubleType()),
StructField(cluster_id_colname, LongType()),
]
)
psrc = src
pdst = dst
@pandas_udf(ecschema, PandasUDFType.GROUPED_MAP)
def eigenc(pdf: pd.DataFrame) -> pd.DataFrame:
nxGraph = nx.Graph()
nxGraph = nx.from_pandas_edgelist(pdf, psrc, pdst)
ec = eigenvector_centrality(nxGraph, tol=1e-03)
out_df = (
pd.DataFrame.from_dict(ec, orient="index", columns=["eigen_centrality"])
.reset_index()
.rename(
columns={"index": "node_id", "eigen_centrality": "eigen_centrality"}
)
)
cluster_id = pdf[cluster_id_colname][0]
out_df[cluster_id_colname] = cluster_id
return out_df
out = sparkdf.groupby(cluster_id_colname).apply(eigenc)
return out
NB. I have created the splink_graph
package in order to run parallelised networkx graph operations like the above on Pyspark efficiently. This is where this code comes from. If you are interested have a look there to see how other graph metrics have been implemented...