I've enjoyed with Polars significant speed-ups over Pandas, except one case. I'm newbie to Polars, so it could be just my wrong usage. Anyway here is the toy-example:
on single column I need to apply custom function in my case it is parse
from probablepeople
library (https://github.com/datamade/probablepeople) but problem is generic.
Plain pandas apply
has similar runtime like Polars, but pandas with parallel_apply
from (https://github.com/nalepae/pandarallel) gets speed-up proportional to number of cores.
It looks for me that Polars uses only single core for custom functions,or I miss something?
If I use Polars correctly, maybe there is a possibility to create tool like pandaralell
for Polars?
!pip install probablepeople
!pip install pandarallel
import pandas as pd
import probablepeople as pp
import polars as pl
from pandarallel import pandarallel
AMOUNT = 1_000_000
#Pandas:
df = pd.DataFrame({'a': ["Mr. Joe Smith"]})
df = df.loc[df.index.repeat(AMOUNT)].reset_index(drop=True)
df['b'] = df['a'].apply(pp.parse)
#Pandarallel:
pandarallel.initialize(progress_bar=True)
df['b_multi'] = df['a'].parallel_apply(pp.parse)
#Polars:
dfp = pl.DataFrame({'a': ["Mr. Joe Smith"]})
dfp = dfp.select(pl.all().repeat_by(AMOUNT).explode())
dfp = dfp.with_columns(pl.col('a').map_elements(pp.parse).alias('b'))
It seems that pandarallel
uses multiprocessing (Pool.map_async
) to run tasks.
It also has its own custom progress bar implementation.
A "simple" way I've found to do this is:
Pool.imap()
(map_async cannot be used with track()
as it consumes the iterable)rich.progress.track()
(which is also bundled with pip
) for a progress bar (tqdm is also popular)import multiprocessing
import polars as pl
import probablepeople as pp
from pip._vendor.rich.progress import track
def map_elements_parallel(expr, function, chunksize=8):
def _run(function, values):
with multiprocessing.get_context("spawn").Pool() as pool:
return pl.Series(pool.imap(function, track(values), chunksize=chunksize))
return expr.map_batches(lambda col: _run(function, col))
if __name__ == "__main__":
df = pl.DataFrame({
"name": ["Mr. Joe Smith", "Mrs. I & II Alice Random"]
})
df = pl.concat([df] * 500_000)
df = df.with_columns(pp =
pl.col("name").pipe(map_elements_parallel, pp.parse)
)
print(df)
Working... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00
shape: (1_000_000, 2)
┌──────────────────────────┬───────────────────────────────────┐
│ name ┆ pp │
│ --- ┆ --- │
│ str ┆ list[list[str]] │
╞══════════════════════════╪═══════════════════════════════════╡
│ Mr. Joe Smith ┆ [["Mr.", "PrefixMarital"], ["Joe… │
│ Mrs. I & II Alice Random ┆ [["Mrs.", "PrefixMarital"], ["I"… │
│ … ┆ … │
Notes:
spawn
must be used to start multiprocessing with Polars.pipe()
is used to run our "helper function".map_batches()
is used to pass the "column" to the "custom function"multiprocessing is quite a complex topic.
Timings seem to vary depending on input size, the specific task being performed, and your specific hardware.
These are the times I got for the example above (on an 8-core system).
name | duration (sec) |
---|---|
map_elements | 77.6496 |
pandarallel | 13.5538 |
imap (chunksize=1) | 52.4519 |
imap (chunksize=8) | 33.1072 |
map_async | 31.5265 |
As pp.parse
returns nested lists, I tried dumping to json
and using .str.json_decode()
to see if there was any difference.
def dumps(value):
return json.dumps(pp.parse(value))
.pipe(map_elements_parallel, dumps).str.json_decode()
name | duration (sec) |
---|---|
imap (chunksize=1) | 36.0517 |
imap (chunksize=8) | 15.0513 |
map_async | 14.0317 |