python-polarspandera

How to create a column check with a filter condition in PanderaPolars / PanderaPandas?


I am trying to use Pandera library to define data quality checks on a dataset. The dataset is loaded as polars dataframe and PanderaPolars is being used to define the checks.

For some column checks, we require a joint condition to be set on multiple columns. For example

using a file having BUY_SELL (str, B or S), PRODUCT_TYPE (str, REV or XX) , QUANTITY (float)

I would like to define the following check:

I tried to come up with 2 custom checks as per below, using Pandas (not Polars) but they are not working as expected. In Polars I was not able to write something equivalent as it seems not to have the lambda available

import pandera as pa
import pandas as pd
import warnings

df = pd.read_csv('file.csv')


check_quantity_grouped = pa.Check(
                        lambda g: g[(True, "B")] > 10,
                            groupby=lambda df: (
                            df.assign(product_type_rev=lambda d: d["PRODUCT_TYPE"] == "REV")
                            .groupby(["product_type_rev", "BUY_SELL"])
                            )
                        ,ignore_na=True,raise_warning=False, error="trade quantity, when product_type = REV and BUY_SELL = B, is less than 10"
                        )

check_quantity_filter = pa.Check(
                        lambda df: df[(df['PRODUCT_TYPE'] == 'REV') & (df['BUY_SELL'] == 'B')] > 10
                        ,ignore_na=True,raise_warning=False
                        )


schema = pa.DataFrameSchema({

    "QUANTITY": pa.Column(float, [check_quantity_filter, check_quantity_grouped], nullable=True)

})


try:
    schema(df, lazy=True)
except pa.errors.SchemaErrors as exc:
    filtered_df = df[df.index.isin(exc.failure_cases["index"])]
    failures = exc.failure_cases
print(f"filtered df:\n{filtered_df}")
print(failures)

My question is whether more complicated check such as the ones listed above are possible in Pandera (Polars ideally but if not in Pandas as well)


Solution

  • I've not used Pandera before, so it's possible this could be improved.

    According to these examples: https://pandera.readthedocs.io/en/latest/polars.html#dataframe-level-checks

    You define a function that is passed data which contains a .lazyframe where you can run your Polars code.

    df = pl.read_csv(b"""
    PRODUCT_TYPE,BUY_SELL,QUANTITY
    ABC,D,13
    REV,B,11
    REV,B,5
    REV,B,12
    REV,C,9
    """.strip())
    

    Your desired logic appears to be:

    df.with_columns(
        pl.when(pl.col.PRODUCT_TYPE == "REV", pl.col.BUY_SELL == "B")
          .then(pl.col.QUANTITY > 10)
          .otherwise(True)
    )
    
    shape: (5, 3)
    ┌──────────────┬──────────┬──────────┐
    │ PRODUCT_TYPE ┆ BUY_SELL ┆ QUANTITY │
    │ ---          ┆ ---      ┆ ---      │
    │ str          ┆ str      ┆ bool     │
    ╞══════════════╪══════════╪══════════╡
    │ ABC          ┆ D        ┆ true     │
    │ REV          ┆ B        ┆ true     │
    │ REV          ┆ B        ┆ false    │
    │ REV          ┆ B        ┆ true     │
    │ REV          ┆ C        ┆ true     │
    └──────────────┴──────────┴──────────┘
    

    Put into Pandera:

    import polars as pl
    import pandera.polars as pa
    from   pandera.polars import PolarsData
    
    df = pl.read_csv(b"""
    PRODUCT_TYPE,BUY_SELL,QUANTITY
    ABC,D,13
    REV,B,11
    REV,B,5
    REV,B,12
    REV,C,9
    """.strip())
    
    def check_quantity_filter(data: PolarsData) -> pl.LazyFrame:
        return data.lazyframe.select(
            pl.when(pl.col.PRODUCT_TYPE == "REV", pl.col.BUY_SELL == "B")
              .then(pl.col.QUANTITY > 10)
              .otherwise(True)
        )
    
    schema = pa.DataFrameSchema({
        "QUANTITY": pa.Column(float, checks=[pa.Check(check_quantity_filter)], nullable=True)
    })
    
    try:
        schema(df, lazy=True)
    except pa.errors.SchemaErrors as exc:
        failures = exc.failure_cases
        print(failures)
    

    It gives:

    shape: (2, 6)
    ┌──────────────┬────────────────┬──────────┬───────────────────────┬──────────────┬───────┐
    │ failure_case ┆ schema_context ┆ column   ┆ check                 ┆ check_number ┆ index │
    │ ---          ┆ ---            ┆ ---      ┆ ---                   ┆ ---          ┆ ---   │
    │ str          ┆ str            ┆ str      ┆ str                   ┆ i32          ┆ i32   │
    ╞══════════════╪════════════════╪══════════╪═══════════════════════╪══════════════╪═══════╡
    │ Int64        ┆ Column         ┆ QUANTITY ┆ dtype('Float64')      ┆ null         ┆ null  │
    │ 5            ┆ Column         ┆ QUANTITY ┆ check_quantity_filter ┆ 0            ┆ 2     │
    └──────────────┴────────────────┴──────────┴───────────────────────┴──────────────┴───────┘