How can I iterate over a large list of polars/pandas dataframes in python with a very fast computation time while simultaneously performing data cleaning/transformation on each dataframe?
My problem: I have a huge list of csv files (~1K for example), each file being 20MB approx. I have converted each of these csv file into dataframes (I have tried both pandas & polars just to see any difference in computation time) & apply some transformation for data cleaning per dataframe.
What's the efficient way of doing this as currently my total computation time, if I use list comprehension or map or even for loops, is ~3mins to convert all ~1K csv files to dataframes & ~2mins to do transformation per dataframe (i.e 2x 1K = 2K mins for all 1K dataframes)? (I am using Python3.11)
Below are more details of what I have tried so far.
A snippet of my csv (only mentioned few rows & columns here to give an idea) which is converted to dataframe looks like this (My actual csv has ~10K rows & 400 columns per file)
df = pl.from_repr("""
┌───────┬───────────────┬────────────────────┐
│ Index ┆ A ┆ B │
│ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ str │
╞═══════╪═══════════════╪════════════════════╡
│ 0 ┆ 203 X 345 457 ┆ 346 X X 457 45 46 │
│ 0 ┆ 11 22 44 890 ┆ 22 33 44 88 90 100 │
│ 0 ┆ X X 456 44 90 ┆ null │
│ 1 ┆ null ┆ 33 456 99 10 10 11 │
│ 1 ┆ null ┆ null │
└───────┴───────────────┴────────────────────┘
""")
So basically I want to transform them into something like this:
┌───────┬─────────────────────────────────┬────────────────────────────────────────┐
│ Index ┆ A ┆ B │
│ --- ┆ --- ┆ --- │
│ i64 ┆ list[f64] ┆ list[f64] │
╞═══════╪═════════════════════════════════╪════════════════════════════════════════╡
│ 0 ┆ [203.0, null, 345.0, 457.0] ┆ [346.0, null, null, 457.0, 45.0, 46.0] │
│ 0 ┆ [11.0, 22.0, 44.0, 890.0] ┆ [22.0, 33.0, 44.0, 88.0, 90.0, 100.0] │
│ 0 ┆ [null, null, 456.0, 44.0, 90.0] ┆ null │
│ 1 ┆ null ┆ [33.0, 456.0, 99.0, 10.0, 10.0, 11.0] │
│ 1 ┆ null ┆ null │
└───────┴─────────────────────────────────┴────────────────────────────────────────┘
My code (this is in polars) so far looks like this:
import polars as pl
def convertdf(input):
return pl.read_csv(input)
def applycast(df,col):
dfcast = df.explode(col).select('Index',pl.col(col)).with_columns(pl.col(col).is_not_null()).then(pl.col(col).cast(pl.Float64,strict=False))).group_by('Index').agg(pl.col(col)).sort('Index')
df = df.replace(col, dfcast[col])
return df
def datatransform(df):
df = df.select(df.columns[1:]).select(pl.col(pl.String).str.split(' '))
df = df.pipe(lambda df : reduce(lambda df,col: applycast(df,col), df.columns,df))
return df
csvfiles = ['input1.csv', 'input2.csv', 'input3.csv',....'input1000.csv']
df_files = list(map(convertdf, csvfiles)) #Time taken: 3mins
dftransformedfiles = [df.pipe(datatransform) for df in df_files] #Time taken: ~2K mins
Basically, as you can see I am using list comprehension to loop over each csv file. Is there any way possible to do parallel execution for this?
Is there any way to apply "applycast()" function to all columns in one shot? Since, currently I am looping over each column, which seems to me the reason why its taking much longer time. Though my contents of each column varies, but datatype is List[str] which needs to be transformed to List[f64].
I tried to concatenate all dataframes before applying datatransform(), but concatenation took even longer time. I thought of using "Ray" API for parallel execution but it doesnt support latest Python3.11. How can I reduce the computation time or in which is the best possible way to iterate over multiple columns or the multiple list of dataframes/csvs?
I think pipe
in general with UDFs is guaranteed to be slow. And replace
is not needed here.
I would do the operation you're looking for with list.eval
, which treats every list in the column as its own Series:
df = pl.DataFrame({"x": [0, 1], "y": ["3 X 4.0", None], "z": ["1 X 2", "X X 5"]})
df.with_columns(
pl.col(pl.String)
.str.split(" ")
.list.eval(pl.element().cast(pl.Int64, strict=False))
)
shape: (2, 3)
┌─────┬─────────────────┬─────────────────┐
│ x ┆ y ┆ z │
│ --- ┆ --- ┆ --- │
│ i64 ┆ list[i64] ┆ list[i64] │
╞═════╪═════════════════╪═════════════════╡
│ 0 ┆ [3, null, null] ┆ [1, null, 2] │
│ 1 ┆ null ┆ [null, null, 5] │
└─────┴─────────────────┴─────────────────┘
So wrapping that with_columns
operation to a method, see what the speedup is then:
def data_load_transform(filename):
return pd.read_csv(filename).with_columns(
pl.col(pl.String)
.str.split(" ")
.list.eval(pl.element().cast(pl.Int64, strict=False))
)
csvfiles = ['input1.csv', 'input2.csv', 'input3.csv',....'input1000.csv']
dftransformedfiles = [data_load_transform(fn) for fn in csvfiles]
If you need further speedup, the list comprehension at the end can be done in parallel in various multiprocessing ways.