pythondataframepython-polarsdata-transform

Python polars dataframe transformation: from flat dataframe to one dataframe per category


I have a flat dataframe representing data in multiple databases, where each database has multiple tables, each table has multiple columns, and each column has multiple values:

df = pl.DataFrame(
    {
        'db_id': ["db_1", "db_1", "db_1", "db_2", "db_2", "db_2"],
        'table_id': ['tab_1', 'tab_1', 'tab_2', 'tab_1', 'tab_2', 'tab_2'],
        'column_id': ['col_1', 'col_2', 'col_1', 'col_2', 'col_1', 'col_3'],
        'data': [[1, 2, 3], [10, 20, 30], [4, 5], [40, 50], [6], [60]]
    }
)
shape: (6, 4)
┌───────┬──────────┬───────────┬──────────────┐
│ db_id ┆ table_id ┆ column_id ┆ data         │
│ ---   ┆ ---      ┆ ---       ┆ ---          │
│ str   ┆ str      ┆ str       ┆ list[i64]    │
╞═══════╪══════════╪═══════════╪══════════════╡
│ db_1  ┆ tab_1    ┆ col_1     ┆ [1, 2, 3]    │
│ db_1  ┆ tab_1    ┆ col_2     ┆ [10, 20, 30] │
│ db_1  ┆ tab_2    ┆ col_1     ┆ [4, 5]       │
│ db_2  ┆ tab_1    ┆ col_2     ┆ [40, 50]     │
│ db_2  ┆ tab_2    ┆ col_1     ┆ [6]          │
│ db_2  ┆ tab_2    ┆ col_3     ┆ [60]         │
└───────┴──────────┴───────────┴──────────────┘

As you can see, different databases share some tables, and tables share some columns.

I want to extract one dataframe per table_id from the above dataframe, where the extracted dataframe is transposed and exploded, i.e. the extracted dataframe should have as its columns the set of column_ids corresponding to the specific table_id (plus db_id), with values being the corresponding values in data. That is, for the above example, the result should be a dictionary with keys "tab_1" and "tab_2", and values being the following dataframes:

tab_1:

shape: (5, 3)
┌───────┬───────┬───────┐
│ db_id ┆ col_1 ┆ col_2 │
│ ---   ┆ ---   ┆ ---   │
│ str   ┆ i64   ┆ i64   │
╞═══════╪═══════╪═══════╡
│ db_1  ┆ 1     ┆ 10    │
│ db_1  ┆ 2     ┆ 20    │
│ db_1  ┆ 3     ┆ 30    │
│ db_2  ┆ null  ┆ 40    │
│ db_2  ┆ null  ┆ 50    │
└───────┴───────┴───────┘

tab_2:

shape: (3, 3)
┌───────┬───────┬───────┐
│ db_id ┆ col_1 ┆ col_3 │
│ ---   ┆ ---   ┆ ---   │
│ str   ┆ i64   ┆ i64   │
╞═══════╪═══════╪═══════╡
│ db_1  ┆ 4     ┆ null  │
│ db_1  ┆ 5     ┆ null  │
│ db_2  ┆ 6     ┆ 60    │
└───────┴───────┴───────┘

I have a working function that does just that (see below), but it's a bit slow. So, I'm wondering if there is a faster way to achieve this?

This is my current solution:

def dataframe_per_table(
    df: pl.DataFrame,
    col_name__table_id: str = "table_id",
    col_name__col_id: str = "column_id",
    col_name__values: str = "data",
    col_name__other_ids: Sequence[str] = ("db_id", )
) -> Dict[str, pl.DataFrame]:

    col_name__other_ids = list(col_name__other_ids)
    table_dfs = {}

    for (table_name, *_), table in df.group_by(
        [col_name__table_id] + col_name__other_ids
    ):
        new_table = table.select(
            pl.col(col_name__other_ids + [col_name__col_id, col_name__values])
        ).pivot(
            on=col_name__col_id,
            index=col_name__other_ids,
            values=col_name__values,
            aggregate_function=None,
        ).explode(
            columns=table[col_name__col_id].unique().to_list()
        )

        table_dfs[table_name] = pl.concat(
            [table_dfs.setdefault(table_name, pl.DataFrame()), new_table],
            how="diagonal"
        )
    return table_dfs

Update: Benchmarking/Summary of Answers

On a dataframe with ~2.5 million rows, my original solution takes about 70 minutes to complete.

Disclaimer: since the execution times were too long, I only timed each solution once (i.e. 1 run, 1 loop), so the margin of error is large.

However, right after posting the question, I realized I can make it much faster just by performing the concat in a separate loop, so that each final dataframe is created by one concat operation instead of many:

def dataframe_per_table_v2(
    df: pl.DataFrame,
    col_name__table_id: str = "table_id",
    col_name__col_id: str = "column_id",
    col_name__values: str = "data",
    col_name__other_ids: Sequence[str] = ("db_id", )
) -> Dict[str, pl.DataFrame]:

    col_name__other_ids = list(col_name__other_ids)
    table_dfs = {}

    for (table_name, *_), table in df.group_by(
        [col_name__table_id] + col_name__other_ids
    ):
        new_table = table.select(
            pl.col(col_name__other_ids + [col_name__col_id, col_name__values])
        ).pivot(
            on=col_name__col_id,
            index=col_name__other_ids,
            values=col_name__values,
            aggregate_function=None,
        ).explode(
            columns=table[col_name__col_id].unique().to_list()
        )

        # Up until here nothing is changed.
        # Now, instead of directly concatenating, we just 
        #  append the new dataframe to a list
        table_dfs.setdefault(table_name, list()).append(new_table)

    # Now, in a separate loop, each final dataframe is created
    #  by concatenating all collected dataframes once.
    for table_name, table_sub_dfs in table_dfs.items():
        table_dfs[table_name] = pl.concat(
            table_sub_dfs,
            how="diagonal"
        )
    return table_dfs

This reduced the time from 70 min to about 10 min; much better, but still too long.

In comparison, the answer by @jqurious takes about 5 min. It needs an additional step at the end to remove the unwanted columns and get a dict from the list, but it's still much faster.

However, the winner is by far the answer by @Dean MacGregor, taking only 50 seconds and directly producing the desired output.

Here is their solution re-written as a function:

def dataframe_per_table_v3(
    df: pl.DataFrame,
    col_name__table_id: str = "table_id",
    col_name__col_id: str = "column_id",
    col_name__values: str = "data",
    col_name__other_ids: Sequence[str] = ("db_id", )
) -> Dict[str, pl.DataFrame]:

    table_dfs = {
            table_id: df.filter(
                pl.col(col_name__table_id) == table_id
            ).with_columns(
                idx_data=pl.int_ranges(pl.col(col_name__values).list.len())
            ).explode(
                [col_name__values, 'idx_data']
            ).pivot(
                on=col_name__col_id, 
                values=col_name__values, 
                index=[*col_name__other_ids, 'idx_data'], 
                aggregate_function='first'
            ).drop(
                'idx_data'
            ) for table_id in df.get_column(col_name__table_id).unique()
    }
    return table_dfs

Solution

  • Let's break it down in a single tab so we'll just look at:

    df.filter(pl.col('table_id')=='tab_1')
    
    shape: (3, 4)
    ┌───────┬──────────┬───────────┬──────────────┐
    │ db_id ┆ table_id ┆ column_id ┆ data         │
    │ ---   ┆ ---      ┆ ---       ┆ ---          │
    │ str   ┆ str      ┆ str       ┆ list[i64]    │
    ╞═══════╪══════════╪═══════════╪══════════════╡
    │ db_1  ┆ tab_1    ┆ col_1     ┆ [1, 2, 3]    │
    │ db_1  ┆ tab_1    ┆ col_2     ┆ [10, 20, 30] │
    │ db_2  ┆ tab_1    ┆ col_2     ┆ [40, 50]     │
    └───────┴──────────┴───────────┴──────────────┘
    

    We want the output to use the order of the elements in the data list combined with the db_id to be the row grouping.

    We need to explicitly create that aforementioned index which we can do with int_ranges

    df.filter(pl.col('table_id')=='tab_1') \
        .with_columns(datai=pl.int_ranges(pl.col('data').list.len()))
    
    shape: (3, 5)
    ┌───────┬──────────┬───────────┬──────────────┬───────────┐
    │ db_id ┆ table_id ┆ column_id ┆ data         ┆ datai     │
    │ ---   ┆ ---      ┆ ---       ┆ ---          ┆ ---       │
    │ str   ┆ str      ┆ str       ┆ list[i64]    ┆ list[i64] │
    ╞═══════╪══════════╪═══════════╪══════════════╪═══════════╡
    │ db_1  ┆ tab_1    ┆ col_1     ┆ [1, 2, 3]    ┆ [0, 1, 2] │
    │ db_1  ┆ tab_1    ┆ col_2     ┆ [10, 20, 30] ┆ [0, 1, 2] │
    │ db_2  ┆ tab_1    ┆ col_2     ┆ [40, 50]     ┆ [0, 1]    │
    └───────┴──────────┴───────────┴──────────────┴───────────┘
    

    Now we just explode/pivot to get

     df \
        .filter(pl.col('table_id')=='tab_1') \
        .with_columns(datai=pl.int_ranges(pl.col('data').list.len())) \
        .explode('data','datai') \
        .pivot(on='column_id', index=['db_id', 'datai'], values='data') \
        .drop('datai')
    
    
    shape: (5, 3)
    ┌───────┬───────┬───────┐
    │ db_id ┆ col_1 ┆ col_2 │
    │ ---   ┆ ---   ┆ ---   │
    │ str   ┆ i64   ┆ i64   │
    ╞═══════╪═══════╪═══════╡
    │ db_1  ┆ 1     ┆ 10    │
    │ db_1  ┆ 2     ┆ 20    │
    │ db_1  ┆ 3     ┆ 30    │
    │ db_2  ┆ null  ┆ 40    │
    │ db_2  ┆ null  ┆ 50    │
    └───────┴───────┴───────┘
    

    Lastly, we just wrap the above in a dictionary compression replacing the hardcoded 'tab_1' with our iterator.

    {tab:df \
        .filter(pl.col('table_id')==tab) \
        .with_columns(datai=pl.int_ranges(pl.col('data').list.len())) \
        .explode(['data','datai']) \
        .pivot(on='column_id', index=['db_id','datai'], values='data') \
        .drop('datai') for tab in df.get_column('table_id').unique()}
    
    
    {'tab_1': shape: (5, 3)
    ┌───────┬───────┬───────┐
    │ db_id ┆ col_1 ┆ col_2 │
    │ ---   ┆ ---   ┆ ---   │
    │ str   ┆ i64   ┆ i64   │
    ╞═══════╪═══════╪═══════╡
    │ db_1  ┆ 1     ┆ 10    │
    │ db_1  ┆ 2     ┆ 20    │
    │ db_1  ┆ 3     ┆ 30    │
    │ db_2  ┆ null  ┆ 40    │
    │ db_2  ┆ null  ┆ 50    │
    └───────┴───────┴───────┘,
    'tab_2': shape: (3, 3)
    ┌───────┬───────┬───────┐
    │ db_id ┆ col_1 ┆ col_3 │
    │ ---   ┆ ---   ┆ ---   │
    │ str   ┆ i64   ┆ i64   │
    ╞═══════╪═══════╪═══════╡
    │ db_1  ┆ 4     ┆ null  │
    │ db_1  ┆ 5     ┆ null  │
    │ db_2  ┆ 6     ┆ 60    │
    └───────┴───────┴───────┘}