performanceparallel-processingpython-polarsraypython-3.11

How can I speed up the computation time for applying data cleaning and transformation on a list of dataframes in Python using polars/pandas?


How can I iterate over a large list of polars/pandas dataframes in python with a very fast computation time while simultaneously performing data cleaning/transformation on each dataframe?

My problem: I have a huge list of csv files (~1K for example), each file being 20MB approx. I have converted each of these csv file into dataframes (I have tried both pandas & polars just to see any difference in computation time) & apply some transformation for data cleaning per dataframe.

What's the efficient way of doing this as currently my total computation time, if I use list comprehension or map or even for loops, is ~3mins to convert all ~1K csv files to dataframes & ~2mins to do transformation per dataframe (i.e 2x 1K = 2K mins for all 1K dataframes)? (I am using Python3.11)

Below are more details of what I have tried so far.

A snippet of my csv (only mentioned few rows & columns here to give an idea) which is converted to dataframe looks like this (My actual csv has ~10K rows & 400 columns per file)

df = pl.from_repr("""
┌───────┬───────────────┬────────────────────┐
│ Index ┆ A             ┆ B                  │
│ ---   ┆ ---           ┆ ---                │
│ i64   ┆ str           ┆ str                │
╞═══════╪═══════════════╪════════════════════╡
│ 0     ┆ 203 X 345 457 ┆ 346 X X 457 45 46  │
│ 0     ┆ 11 22 44 890  ┆ 22 33 44 88 90 100 │
│ 0     ┆ X X 456 44 90 ┆ null               │
│ 1     ┆ null          ┆ 33 456 99 10 10 11 │
│ 1     ┆ null          ┆ null               │
└───────┴───────────────┴────────────────────┘
""")

So basically I want to transform them into something like this:

┌───────┬─────────────────────────────────┬────────────────────────────────────────┐
│ Index ┆ A                               ┆ B                                      │
│ ---   ┆ ---                             ┆ ---                                    │
│ i64   ┆ list[f64]                       ┆ list[f64]                              │
╞═══════╪═════════════════════════════════╪════════════════════════════════════════╡
│ 0     ┆ [203.0, null, 345.0, 457.0]     ┆ [346.0, null, null, 457.0, 45.0, 46.0] │
│ 0     ┆ [11.0, 22.0, 44.0, 890.0]       ┆ [22.0, 33.0, 44.0, 88.0, 90.0, 100.0]  │
│ 0     ┆ [null, null, 456.0, 44.0, 90.0] ┆ null                                   │
│ 1     ┆ null                            ┆ [33.0, 456.0, 99.0, 10.0, 10.0, 11.0]  │
│ 1     ┆ null                            ┆ null                                   │
└───────┴─────────────────────────────────┴────────────────────────────────────────┘

My code (this is in polars) so far looks like this:

import polars as pl  

def convertdf(input):  
    return pl.read_csv(input)  

def applycast(df,col):  
    dfcast     = df.explode(col).select('Index',pl.col(col)).with_columns(pl.col(col).is_not_null()).then(pl.col(col).cast(pl.Float64,strict=False))).group_by('Index').agg(pl.col(col)).sort('Index')  
    df = df.replace(col,    dfcast[col]) 
    return df  

def datatransform(df):  
   df =     df.select(df.columns[1:]).select(pl.col(pl.String).str.split(' ')) 
   df = df.pipe(lambda df : reduce(lambda df,col: applycast(df,col), df.columns,df))  
   return df  

csvfiles = ['input1.csv', 'input2.csv', 'input3.csv',....'input1000.csv'] 
df_files = list(map(convertdf, csvfiles)) #Time taken: 3mins 
dftransformedfiles = [df.pipe(datatransform) for df in df_files] #Time taken: ~2K mins 

Basically, as you can see I am using list comprehension to loop over each csv file. Is there any way possible to do parallel execution for this?

Is there any way to apply "applycast()" function to all columns in one shot? Since, currently I am looping over each column, which seems to me the reason why its taking much longer time. Though my contents of each column varies, but datatype is List[str] which needs to be transformed to List[f64].

I tried to concatenate all dataframes before applying datatransform(), but concatenation took even longer time. I thought of using "Ray" API for parallel execution but it doesnt support latest Python3.11. How can I reduce the computation time or in which is the best possible way to iterate over multiple columns or the multiple list of dataframes/csvs?


Solution

  • I think pipe in general with UDFs is guaranteed to be slow. And replace is not needed here.

    I would do the operation you're looking for with list.eval, which treats every list in the column as its own Series:

    df = pl.DataFrame({"x": [0, 1], "y": ["3 X 4.0", None], "z": ["1 X 2", "X X 5"]})
    df.with_columns(
        pl.col(pl.String)
        .str.split(" ")
        .list.eval(pl.element().cast(pl.Int64, strict=False))
    )
    
    shape: (2, 3)
    ┌─────┬─────────────────┬─────────────────┐
    │ x   ┆ y               ┆ z               │
    │ --- ┆ ---             ┆ ---             │
    │ i64 ┆ list[i64]       ┆ list[i64]       │
    ╞═════╪═════════════════╪═════════════════╡
    │ 0   ┆ [3, null, null] ┆ [1, null, 2]    │
    │ 1   ┆ null            ┆ [null, null, 5] │
    └─────┴─────────────────┴─────────────────┘
    

    So wrapping that with_columns operation to a method, see what the speedup is then:

    def data_load_transform(filename):
      return pd.read_csv(filename).with_columns(
          pl.col(pl.String)
          .str.split(" ")
          .list.eval(pl.element().cast(pl.Int64, strict=False))
      )
    
    csvfiles = ['input1.csv', 'input2.csv', 'input3.csv',....'input1000.csv'] 
    dftransformedfiles = [data_load_transform(fn) for fn in csvfiles]
    

    If you need further speedup, the list comprehension at the end can be done in parallel in various multiprocessing ways.