pythonsqlpysparktreehierarchical-data

Efficient Way to Build Large Scale Hierarchical Data Tree Path


I have a large dataset (think: big data) of network elements that form a tree-like network.

A toy dataset looks like this:

|   id | type   | parent_id   |
|-----:|:-------|:------------|
|    1 | D      | <NA>        |
|    2 | C      | 1           |
|    3 | C      | 2           |
|    4 | C      | 3           |
|    5 | B      | 3           |
|    6 | B      | 4           |
|    7 | A      | 4           |
|    8 | A      | 5           |
|    9 | A      | 3           |

Important rules:

What I would like to achieve is to build an efficient way (preferably in Spark) to calculate the tree-path for each node, like so:

|   id | type   | parent_id   | path                |
|-----:|:-------|:------------|:--------------------|
|    1 | D      | <NA>        | D:1                 |
|    2 | C      | 1           | D:1>C:2             |
|    3 | C      | 2           | D:1>C:2>C:3         |
|    4 | C      | 3           | D:1>C:2>C:3>C:4     |
|    5 | B      | 3           | D:1>C:2>C:3>B:5     |
|    6 | B      | 4           | D:1>C:2>C:3>C:4>B:6 |
|    7 | A      | 4           | D:1>C:2>C:3>C:4>A:7 |
|    8 | A      | 5           | D:1>C:2>C:3>B:5>A:8 |
|    9 | A      | 3           | D:1>C:2>C:3>A:9     |

Note:

Each element in the tree path is constructed like this: id:type.

If you have other efficient ways to store the tree path (e.g., closure tables) and calculate them, I am happy to hear them as well. However, the runtime for calculation must be really low (less than an hour, preferably minutes) and retrieval later needs to be in the area of few seconds.

The ultimate end goal is to have a data structure that allows me to aggregate any network node underneath a certain node efficiently (runtime of a few seconds at most).

The actual dataset consisting of around 3M nodes can be constructed like this:

Note:

import random
import pandas as pd
random.seed(1337)
node_counts = {'A': 1424383, 'B': 596994, 'C': 234745, 'D': 230937, 'E': 210663, 'F': 122859, 'G': 119453, 'H': 57462, 'I': 23260, 'J': 15008, 'K': 10666, 'L': 6943, 'M': 6724, 'N': 2371, 'O': 2005, 'P': 385}
#node_counts = {'A': 3, 'B': 2, 'C': 3, 'D': 1}
elements = list()
candidates = list()
root_type = list(node_counts.keys())[-1]
leaf_type = list(node_counts.keys())[0]
root_counts = node_counts[root_type]
leaves_count = node_counts[leaf_type]
ids = [i + 1 for i in range(sum(node_counts.values()))]
idcounter = 0
for i, (name, count) in enumerate(sorted(node_counts.items(), reverse=True)):
    for _ in range(count):
        _id = ids[idcounter]
        idcounter += 1
        _type = name
        if i == 0:
            _parent = None
        else:
            # select a random one that is not a root or a leaf
            if len(candidates) == 0: # first bootstrap case
                candidate = random.choice(elements)
            else:
                candidate = random.choice(candidates)
            _parent = candidate['id']
        _obj = {'id': _id, 'type': _type, 'parent_id': _parent}
        #print(_obj)
        elements.append(_obj)
        if _type != root_type and _type != leaf_type:
            candidates.append(_obj)
df = pd.DataFrame.from_dict(elements).astype({'parent_id': 'Int64'})

In order to produce the tree path in pure python with the above toy data you can use the following function:

def get_hierarchy_path(df, cache_dict, ID='id', LABEL = 'type', PARENT_ID = 'parent_id', node_sep='|', elem_sep=':'):
    def get_path(record):
        if pd.isna(record[PARENT_ID]):
            return f'{record[LABEL]}{elem_sep}{record[ID]}'
        else:
            if record[PARENT_ID] in cache_dict:
                parent_path = cache_dict[record[PARENT_ID]]
            else:
                try:
                    parent_path = get_path(df.query(f'{ID} == {record[PARENT_ID]}').iloc[0])
                except IndexError as e:
                    print(f'Index Miss for {record[PARENT_ID]} on record {record.to_dict()}')
                    parent_path = f'{record[LABEL]}{elem_sep}{record[ID]}'
                cache_dict[record[PARENT_ID]] = parent_path
            return f"{parent_path}{node_sep}{record[LABEL]}{elem_sep}{record[ID]}"
    return df.apply(get_path, axis=1)
df['path'] = get_hierarchy_path(df, dict(), node_sep='>')

What I have already tried:

Thank you for your help.


Solution

  • My current solution for this challenge relies now no longer on Spark but on SQL. I load the whole dataset to a Postgres DB and place a Unique Index on id, type and parent_id.

    Then using the following query, I can calculate the path:

    with recursive recursive_hierarchy AS (
        -- starting point
        select 
            parent_id
            , id 
            , type 
            , type || ':' || id as path
            , 1 as lvl
        from hierarchy.nodes
    
        union all
        -- recursion
        select
            ne.parent_id as parent_id
            , h.id 
            , h.type 
            , ne.type || ':' || ne.id || '|' || h.path as path
            , h.lvl + 1 as lvl
        from (
            select * 
            from hierarchy.nodes
        ) ne
        inner join recursive_hierarchy h
            on ne.id = h.parent_id
    ), paths as (
        -- complete results
        select 
            *
        from recursive_hierarchy
    ), max_lvl as (
        -- retrieve the longest path of a network element
        select
            id
            , max(lvl) as max_lvl
        from paths
        group by id
    )
    -- all results with only the longest path of a network element
    select distinct
        , p.id
        , p.type
        , p.path
    from paths p
        inner join max_lvl l
            on p.id = l.id
            and p.lvl = l.max_lvl