pysparkdatabricksuser-defined-functions

Facing issue with python udf in Databricks


I'm working with hierarchical data in PySpark where each employee has a manager, and I need to find all the inline managers for each employee. An inline manager is defined as the manager of the manager, and so on, until we reach the top-level manager (CEO) who does not have a manager.

from pyspark.sql.functions import udf, broadcast, col
from pyspark.sql.types import ArrayType, StringType

# Broadcast the DataFrame df
broadcast_df = broadcast(df)

# Define a function to find inline managers
def find_inline_managers(user_id, manager_id):
    inline_managers = []
    while manager_id is not None:
        manager = broadcast_df.filter(col("user_id") == manager_id).select("username").first()[0]
        inline_managers.append(f"{manager}_level_{len(inline_managers) + 1}")
        manager_id = broadcast_df.filter(col("user_id") == manager_id).select("manager_id").first()[0]
    return inline_managers

# Register the UDF
find_inline_managers_udf = udf(find_inline_managers, ArrayType(StringType()))

# Apply the UDF to create the new column
df = df.withColumn("inline_managers", find_inline_managers_udf("user_id", "manager_id"))

Here, I have developed the find_inline_managers udf to create a derived column "inline_mangers", but I am getting following error message:

PicklingError: Could not serialize object: RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

How can we fix this? If you know any alternative way to solve this problem, then let me know, thanks Note: Recursive CTE is not supported in Databricks


Solution

  • Is it necessary that you have to use Pyspark in Databricks ?

    If yes, this answer could help you. It does exactly that.

    https://stackoverflow.com/a/77627393/3238085

    Here is another solution which can help you. Unfortunately the guy who asked question has deleted it.

    https://gist.github.com/dineshdharme/7c13dcde72e42fdd3ec47d1ad40f6177

    Since this is a tree, graph problem statement, a processing library like NetworkX or Graphframes or graph-tool can be more beneficial.

    I would suggest to look into them. I believe NetworkX or graph-tool is best.

    https://graph-tool.skewed.de/

    What is graph-tool?

    Graph-tool is an efficient Python module for manipulation and statistical analysis of graphs (a.k.a. networks). Contrary to most other Python modules with similar functionality, the core data structures and algorithms are implemented in C++, making extensive use of template metaprogramming, based heavily on the Boost Graph Library. This confers it a level of performance that is comparable (both in memory usage and computation time) to that of a pure C/C++ library.

    Graph-tool can be orders of magnitude faster than Python-only alternatives, and therefore it is specially suited for large-scale network analysis.

    https://graph-tool.skewed.de/static/doc/index.html#installing-graph-tool

    conda create --name gt -c conda-forge graph-tool
    conda activate gt
    

    Following is a simple example of how to do this :

    from graph_tool.all import Graph, graph_draw, BFSVisitor, bfs_search
    from graph_tool.draw import radial_tree_layout
    
    data = [
        ("CEO", "M3"),
        ("M3", "M1"),
        ("M3", "M2"),
        ("M1", "E1"),
        ("M1", "E2"),
        ("M2", "E3")
    ]
    
    g = Graph(directed=True)
    
    vertex_dict = {}
    
    
    def get_vertex(name):
        if name not in vertex_dict:
            v = g.add_vertex()
            vertex_dict[name] = v
            v_name[v] = name
        return vertex_dict[name]
    
    
    v_name = g.new_vertex_property("string")
    
    for manager, employee in data:
        v_manager = get_vertex(manager)
        v_employee = get_vertex(employee)
        g.add_edge(v_manager, v_employee)
    
    class PathCollector(BFSVisitor):
        def __init__(self, pred_map):
            self.pred_map = pred_map
    
        def tree_edge(self, e):
            self.pred_map[e.target()] = e.source()
    
    
    def find_paths(root):
        pred_map = g.new_vertex_property("int64_t", -1)
        bfs_search(g, root, PathCollector(pred_map))
    
        def get_path_to(v):
            path = []
            while v != root and g.vertex(v) != None:
                path.append(v)
                v = pred_map[v]
            path.append(root)
            path.reverse()
            return path
    
        paths = {v_name[v]: [v_name[x] for x in get_path_to(v)] for v in vertex_dict.values() if v != root}
        return paths
    
    
    root_vertex = vertex_dict["CEO"]
    paths = find_paths(root_vertex)
    for emp, path in paths.items():
        path_reversed = reversed(path)
        print(f"Path from {emp} to CEO: {' -> '.join(path_reversed)}")
    
    
    pos = radial_tree_layout(g, g.vertex(vertex_dict["CEO"]))
    graph_draw(g, pos, vertex_text=v_name, vertex_font_size=18,
               output_size=(1000, 1000), output="manager-employee.png")
    

    Output :

    Path from M3 to CEO: M3 -> CEO
    Path from M1 to CEO: M1 -> M3 -> CEO
    Path from M2 to CEO: M2 -> M3 -> CEO
    Path from E1 to CEO: E1 -> M1 -> M3 -> CEO
    Path from E2 to CEO: E2 -> M1 -> M3 -> CEO
    Path from E3 to CEO: E3 -> M2 -> M3 -> CEO
    

    Image generated by the library :

    enter image description here