pythonapplypython-polars

Polars map_elements performance for custom functions


I've enjoyed with Polars significant speed-ups over Pandas, except one case. I'm newbie to Polars, so it could be just my wrong usage. Anyway here is the toy-example: on single column I need to apply custom function in my case it is parse from probablepeople library (https://github.com/datamade/probablepeople) but problem is generic.

Plain pandas apply has similar runtime like Polars, but pandas with parallel_apply from (https://github.com/nalepae/pandarallel) gets speed-up proportional to number of cores.

It looks for me that Polars uses only single core for custom functions,or I miss something?

If I use Polars correctly, maybe there is a possibility to create tool like pandaralell for Polars?

!pip install probablepeople
!pip install pandarallel

import pandas as pd
import probablepeople as pp
import polars as pl
from pandarallel import pandarallel

AMOUNT = 1_000_000
#Pandas:
df = pd.DataFrame({'a': ["Mr. Joe Smith"]})
df = df.loc[df.index.repeat(AMOUNT)].reset_index(drop=True)

df['b'] = df['a'].apply(pp.parse)

#Pandarallel:
pandarallel.initialize(progress_bar=True)
df['b_multi'] = df['a'].parallel_apply(pp.parse)

#Polars:
dfp = pl.DataFrame({'a': ["Mr. Joe Smith"]})
dfp = dfp.select(pl.all().repeat_by(AMOUNT).explode())

dfp = dfp.with_columns(pl.col('a').map_elements(pp.parse).alias('b'))



Solution

  • It seems that pandarallel uses multiprocessing (Pool.map_async) to run tasks.

    It also has its own custom progress bar implementation.

    A "simple" way I've found to do this is:

    import multiprocessing
    import polars as pl
    import probablepeople as pp
    from   pip._vendor.rich.progress import track
    
    def map_elements_parallel(expr, function, chunksize=8):
        def _run(function, values):
            with multiprocessing.get_context("spawn").Pool() as pool:
                return pl.Series(pool.imap(function, track(values), chunksize=chunksize))
        return expr.map_batches(lambda col: _run(function, col))
    
    
    if __name__ == "__main__":
    
        df =  pl.DataFrame({
            "name": ["Mr. Joe Smith", "Mrs. I & II Alice Random"]
        })
    
        df = pl.concat([df] * 500_000)
    
    
        df = df.with_columns(pp = 
           pl.col("name").pipe(map_elements_parallel, pp.parse)
        )
    
        print(df)
    
    Working... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00
    shape: (1_000_000, 2)
    ┌──────────────────────────┬───────────────────────────────────┐
    │ name                     ┆ pp                                │
    │ ---                      ┆ ---                               │
    │ str                      ┆ list[list[str]]                   │
    ╞══════════════════════════╪═══════════════════════════════════╡
    │ Mr. Joe Smith            ┆ [["Mr.", "PrefixMarital"], ["Joe… │
    │ Mrs. I & II Alice Random ┆ [["Mrs.", "PrefixMarital"], ["I"… │
    │ …                        ┆ …                                 │
    

    Notes:


    Performance

    multiprocessing is quite a complex topic.

    Timings seem to vary depending on input size, the specific task being performed, and your specific hardware.

    These are the times I got for the example above (on an 8-core system).

    name duration (sec)
    map_elements 77.6496
    pandarallel 13.5538
    imap (chunksize=1) 52.4519
    imap (chunksize=8) 33.1072
    map_async 31.5265

    As pp.parse returns nested lists, I tried dumping to json and using .str.json_decode() to see if there was any difference.

    def dumps(value):
        return json.dumps(pp.parse(value))
    
    .pipe(map_elements_parallel, dumps).str.json_decode()
    
    name duration (sec)
    imap (chunksize=1) 36.0517
    imap (chunksize=8) 15.0513
    map_async 14.0317