I am trying to refactor (translate) a given SQL query to python script using polars library.
I am stuck in one line of query where ROW_NUMBER()
function is used followed by OVER(PARTITION BY)
function.
Below is the table schema:
product_id (INTEGER)
variant_id (INTEGER)
client_code (VARCHAR)
transaction_date (DATE)
customer_id (INTEGER)
store_id (INTEGER)
invoice_id (VARCHAR)
invoice_line_id (INTEGER)
quantity (NUMERIC)
net_sales_price (NUMERIC)
Below is the SQL query:
SELECT
product_id,
variant_id,
client_code,
transaction_date,
ROW_NUMBER() OVER(
PARTITION BY
product_id, variant_id, store_id, customer_id, client_code
ORDER BY
transaction_date ASC,
invoice_id ASC,
invoice_line_id ASC,
quantity DESC,
net_sales_price ASC
) AS repeat_purchase_seq
FROM transactions
I tried some ways, such as:
example 1: using pl.first().cum_count().over()
new_df = (
df
.sort(['product_id', 'variant_id', 'store_id', 'customer_id', 'client_code','transaction_date', 'invoice_id', 'invoice_line_id',pl.col('quantity').reverse(), 'net_sales_price'])
.with_columns(repeat_purchase_seq = pl.first().cum_count().over(['product_id', 'variant_id', 'store_id', 'customer_id', 'client_code']).flatten())
)
example 2: using pl.rank('ordinal').over()
new_df = (
df
.sort(['transaction_date', 'invoice_id', 'invoice_line_id', 'quantity', 'net_sales_price'], descending=[False, False, False, True, False])
.with_columns(repeat_purchase_seq = pl.struct('transaction_date', 'invoice_id', 'invoice_line_id', 'quantity', 'net_sales_price').rank('ordinal').over(['product_id', 'variant_id', 'store_id', 'customer_id', 'client_code']))
)
Both the examples have some or the other problem,
I tried to compare the table created by SQL with the dataframe created using Polars, out of 17 million rows, there are around 250,000 rows which doesn't match.
So is there a better way to handle this ROW_NUMBER() OVER(PARTITION BY)
situation?
Edit - Below is the answer by @roman, which helped in my case:
partition_by_keys = ["product_id", "variant_id", "store_id", "customer_id", "client_code"]
order_by_keys = ["transaction_date", "invoice_id", "invoice_line_id", "quantity", "net_sales_price"]
order_by_descending = [False, False, False, True, False]
order_by = [-pl.col(col) if desc else pl.col(col) for col, desc in zip(order_by_keys, order_by_descending)]
df.with_columns(
pl.struct(order_by)
.rank("ordinal")
.over(partition_by_keys)
.alias("rn")
)
You could use pl.Expr.rank()
but it is applied to one pl.Expr
/column. You can, of course, create this column out of sequence of columns with pl.struct()
and rank it:
partition_by_keys = ["product_id", "variant_id", "store_id", "customer_id", "client_code"]
order_by_keys = ["transaction_date", "invoice_id", "invoice_line_id", "quantity", "net_sales_price"]
df.with_columns(
pl.struct(order_by_keys)
.rank("ordinal")
.over(partition_by_keys)
.alias("rn")
)
But there's a problem with applying asc
and desc
sort based on struct' fields. If you had numeric fields you could use negation, but you have string-typed columns there as well. In your case you can actually do it, cause the only column where you want to sort descending is quantity
:
partition_by_keys = ["product_id", "variant_id", "store_id", "customer_id", "client_code"]
order_by_keys = ["transaction_date", "invoice_id", "invoice_line_id", "quantity", "net_sales_price"]
order_by_descending = [False, False, False, True, False]
order_by = [-pl.col(col) if desc else pl.col(col) for col, desc in zip(order_by_keys, order_by_descending)]
df.with_columns(
pl.struct(order_by)
.rank("ordinal")
.over(partition_by_keys)
.alias("rn")
)
And more generic approach would be to use pl.DataFrame.sort()
and pl.int_range()
. I’ve added pl.DataFrame.with_row_index()
and additional sort to return back to original order.
partition_by_keys = ["product_id", "variant_id", "store_id", "customer_id", "client_code"]
order_by_keys = ["transaction_date", "invoice_id", "invoice_line_id", "quantity", "net_sales_price"]
order_by_descending = [False, False, False, True, False]
(
df.with_row_index()
.sort(order_by_keys, descending=order_by_descending)
.with_columns(
rn = pl.int_range(pl.len()).over(partition_by_keys)
)
.sort(“index”)
.drop(“index”)
)