I am trying to use Pandera library to define data quality checks on a dataset. The dataset is loaded as polars dataframe and PanderaPolars is being used to define the checks.
For some column checks, we require a joint condition to be set on multiple columns. For example
using a file having BUY_SELL (str, B or S), PRODUCT_TYPE (str, REV or XX) , QUANTITY (float)
I would like to define the following check:
I tried to come up with 2 custom checks as per below, using Pandas (not Polars) but they are not working as expected. In Polars I was not able to write something equivalent as it seems not to have the lambda available
import pandera as pa
import pandas as pd
import warnings
df = pd.read_csv('file.csv')
check_quantity_grouped = pa.Check(
lambda g: g[(True, "B")] > 10,
groupby=lambda df: (
df.assign(product_type_rev=lambda d: d["PRODUCT_TYPE"] == "REV")
.groupby(["product_type_rev", "BUY_SELL"])
)
,ignore_na=True,raise_warning=False, error="trade quantity, when product_type = REV and BUY_SELL = B, is less than 10"
)
check_quantity_filter = pa.Check(
lambda df: df[(df['PRODUCT_TYPE'] == 'REV') & (df['BUY_SELL'] == 'B')] > 10
,ignore_na=True,raise_warning=False
)
schema = pa.DataFrameSchema({
"QUANTITY": pa.Column(float, [check_quantity_filter, check_quantity_grouped], nullable=True)
})
try:
schema(df, lazy=True)
except pa.errors.SchemaErrors as exc:
filtered_df = df[df.index.isin(exc.failure_cases["index"])]
failures = exc.failure_cases
print(f"filtered df:\n{filtered_df}")
print(failures)
My question is whether more complicated check such as the ones listed above are possible in Pandera (Polars ideally but if not in Pandas as well)
I've not used Pandera before, so it's possible this could be improved.
According to these examples: https://pandera.readthedocs.io/en/latest/polars.html#dataframe-level-checks
You define a function that is passed data
which contains a .lazyframe
where you can run your Polars code.
df = pl.read_csv(b"""
PRODUCT_TYPE,BUY_SELL,QUANTITY
ABC,D,13
REV,B,11
REV,B,5
REV,B,12
REV,C,9
""".strip())
Your desired logic appears to be:
df.with_columns(
pl.when(pl.col.PRODUCT_TYPE == "REV", pl.col.BUY_SELL == "B")
.then(pl.col.QUANTITY > 10)
.otherwise(True)
)
shape: (5, 3)
┌──────────────┬──────────┬──────────┐
│ PRODUCT_TYPE ┆ BUY_SELL ┆ QUANTITY │
│ --- ┆ --- ┆ --- │
│ str ┆ str ┆ bool │
╞══════════════╪══════════╪══════════╡
│ ABC ┆ D ┆ true │
│ REV ┆ B ┆ true │
│ REV ┆ B ┆ false │
│ REV ┆ B ┆ true │
│ REV ┆ C ┆ true │
└──────────────┴──────────┴──────────┘
Put into Pandera:
import polars as pl
import pandera.polars as pa
from pandera.polars import PolarsData
df = pl.read_csv(b"""
PRODUCT_TYPE,BUY_SELL,QUANTITY
ABC,D,13
REV,B,11
REV,B,5
REV,B,12
REV,C,9
""".strip())
def check_quantity_filter(data: PolarsData) -> pl.LazyFrame:
return data.lazyframe.select(
pl.when(pl.col.PRODUCT_TYPE == "REV", pl.col.BUY_SELL == "B")
.then(pl.col.QUANTITY > 10)
.otherwise(True)
)
schema = pa.DataFrameSchema({
"QUANTITY": pa.Column(float, checks=[pa.Check(check_quantity_filter)], nullable=True)
})
try:
schema(df, lazy=True)
except pa.errors.SchemaErrors as exc:
failures = exc.failure_cases
print(failures)
It gives:
shape: (2, 6)
┌──────────────┬────────────────┬──────────┬───────────────────────┬──────────────┬───────┐
│ failure_case ┆ schema_context ┆ column ┆ check ┆ check_number ┆ index │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ str ┆ str ┆ str ┆ i32 ┆ i32 │
╞══════════════╪════════════════╪══════════╪═══════════════════════╪══════════════╪═══════╡
│ Int64 ┆ Column ┆ QUANTITY ┆ dtype('Float64') ┆ null ┆ null │
│ 5 ┆ Column ┆ QUANTITY ┆ check_quantity_filter ┆ 0 ┆ 2 │
└──────────────┴────────────────┴──────────┴───────────────────────┴──────────────┴───────┘