pythonlazy-evaluationpython-polars

Replace pivot operation for use in lazy evaluation with polars


I have a set of events at timestamps, and for each timestamp I need the sum of the "last" values of each username. This can be done with a pivot table, but I would like to use LazyFrame, because with many unique usernames, the pivot table would overflow RAM. However, LazyFrame does not support pivot.

The number of unique usernames are on the order ~1000s, with the events being in the order of 10s of millions.

A working example with pivot and DataFrame:

The input dataframe:

df = pl.from_repr("""
┌────────────┬──────────┬───────┐
│ timestamp  ┆ username ┆ kudos │
│ ---        ┆ ---      ┆ ---   │
│ i64        ┆ str      ┆ i64   │
╞════════════╪══════════╪═══════╡
│ 1690886106 ┆ ABC      ┆ 123   │
│ 1690886107 ┆ DEF      ┆ 10    │
│ 1690886110 ┆ DEF      ┆ 12    │
│ 1690886210 ┆ GIH      ┆ 0     │
└────────────┴──────────┴───────┘
""")

I can achieve the task using pivot:

(
    df.pivot(
        on="username",
        index="timestamp",
        values=["kudos"],
        aggregate_function="last",
    )
    .select(pl.all().forward_fill())
    .fill_null(strategy="zero")
    .select(pl.col("timestamp"), pl.sum_horizontal(df["username"].unique().to_list()).alias("sum"))
)

The results are correct:

shape: (4, 2)
┌────────────┬─────┐
│ timestamp  ┆ sum │
│ ---        ┆ --- │
│ i64        ┆ i64 │
╞════════════╪═════╡
│ 1690886106 ┆ 123 │
│ 1690886107 ┆ 133 │
│ 1690886110 ┆ 135 │
│ 1690886210 ┆ 135 │
└────────────┴─────┘

How would one implement this with LazyFrame such that it is efficient for a large number of unique usernames (i.e. using lazy evaluation and possibly without a giant sparse pivot table)?


Solution

  • In this case your pivot is basically doing two jobs, one is like a groupby and the other is to create all the unique combinations of timestamp and username. We can do that without pivot.

    First we create the unique combinations and join it back to the original like this...

    (df
     .select(pl.col('timestamp','username').unique().implode())
     .explode('timestamp')
     .explode('username')
     .join(df, on=['timestamp','username'], how='left'))
    shape: (12, 3)
    ┌────────────┬──────────┬───────┐
    │ timestamp  ┆ username ┆ kudos │
    │ ---        ┆ ---      ┆ ---   │
    │ i64        ┆ str      ┆ i64   │
    ╞════════════╪══════════╪═══════╡
    │ 1690886106 ┆ DEF      ┆ null  │
    │ 1690886106 ┆ GIH      ┆ null  │
    │ 1690886106 ┆ ABC      ┆ 123   │
    │ 1690886107 ┆ DEF      ┆ 10    │
    │ …          ┆ …        ┆ …     │
    │ 1690886110 ┆ ABC      ┆ null  │
    │ 1690886210 ┆ DEF      ┆ null  │
    │ 1690886210 ┆ GIH      ┆ 0     │
    │ 1690886210 ┆ ABC      ┆ null  │
    └────────────┴──────────┴───────┘
    
    

    The rest of the operations looks pretty similar to what you're doing after the pivot the only extra nuance is going to be the use of a window function on the forward_fill.fill_null step.

    Putting it all together you have...

    df=df.lazy()
    (df
     .select(pl.col('timestamp','username').unique().implode())
     .explode('timestamp')
     .explode('username')
     .join(df, on=['timestamp','username'], how='left')
     .with_columns(pl.col('kudos').forward_fill().fill_null(0).over('username'))
     .group_by('timestamp',maintain_order=True)
     .agg(pl.col('kudos').sum())
     .collect())
    shape: (4, 2)
    ┌────────────┬───────┐
    │ timestamp  ┆ kudos │
    │ ---        ┆ ---   │
    │ i64        ┆ i64   │
    ╞════════════╪═══════╡
    │ 1690886106 ┆ 123   │
    │ 1690886107 ┆ 133   │
    │ 1690886110 ┆ 135   │
    │ 1690886210 ┆ 135   │
    └────────────┴───────┘
    

    This should be significantly faster than a pivot with rowwise sums as neither of those are particularly well optimized.

    Update for bigger data

    If the dataset has few unique combos of timestamp and username then the scale of the above will be bad as it'll make a df that's too big.

    Alternatively, we can mimic the behavior of pivot using a generator with when.then

    I'm getting errors trying to use the new sum_horizontal so we also need an efficient way to do a rowwise sum.

    Starting with...

    df = pl.DataFrame({ 
    "timestamp": np.arange(1, 1e6+1), 
    "username": np.random.randint(0, 1000, size=int(1e6)), 
    "kudos": np.random.randint(0, 1000, size=int(1e6)), }, 
    schema={"timestamp": pl.Int64, 
    "username": pl.String, 
    "kudos": pl.Int64}, ) 
    

    We want to create a series of all the unique usernames which we'll use a couple times

    usernames=df.get_column('username').unique()

    then convert the df to lazy

    df=df.lazy()

    Now we create an expression for a rowwise sum of all our usernames we have to use the internal __add__ method:

    rowwise=pl.col(usernames[0]).__add__(pl.col(usernames[1]))
    for username in usernames[2:]:
        rowwise=rowwise.__add__(pl.col(username))
    

    I tried chaining it like rowwise=rowwise+pl.col(username) but it creates a huge nest of parenthesis like (a+(b+(c+(d+e(.....)))) and that makes it die at a future step

    Then we do:

    result=(df
     .select(
        ['timestamp'] +
        [(pl.when(pl.col('username')==x).then(pl.col('kudos'))).alias(x) for x in usernames]
    )
     .with_columns(pl.col(usernames).forward_fill().fill_null(0))
     .select('timestamp', rowwise.alias('sum'))
     .collect()
     )
    

    where the first select mimics the pivot then the next with_columns does the same forward/fill as before. The last select just implements the rowwise summation.

    My jupyter cell can do that in 6.9s in contrast to the pivot method of closer to 9s

    If the timestamp field has duplicates you'll need to do something like

    result.group_by('timestamp', maintain_order=True).agg(pl.col('sum').last())