Currently, I'm making calculations using PySpark on a dataframe where information on how loans are paid by borrowers is shown.
I'm new to PySpark and decided to ask for help while trying to execute complex filtering operations.
My dataframe is:
ID ContractDate Loansum Debt MaturityDate Bank
ID1 2024-06-01 100 10 2024-06-18 A
ID1 2024-06-05 50 20 2024-06-17 B
ID1 2024-06-10 50 20 2024-06-15 C
ID1 2024-06-10 90 70 2024-06-30 D
ID1 2024-06-15 50 50 R
ID1 2024-07-15 100 90 D
ID2 2024-08-01 70 20 2024-09-01 A
ID2 2024-08-08 50 10 2024-08-26 B
ID2 2024-08-20 32 32 R
ID3 2024-09-01 60 10 2024-09-24 A
ID3 2024-09-03 40 10 2024-09-23 B
ID3 2024-09-22 22 22 R
ID4 2024-10-03 40 10 2024-10-23 B
ID5 2024-11-01 70 20 2024-11-21 A
ID5 2024-11-08 50 10 2024-11-22 B
ID5 2024-11-20 40 40 R
My goal is to filter for rows with multiple criteria.
For instance, for each unique ID, my goal is to get the rows only if:
Expected result:
ID ContractDate Loansum Debt MaturityDate Bank Marker
ID1 2024-06-01 100 10 2024-06-18 A Previous
ID1 2024-06-05 50 20 2024-06-17 B Previous
ID1 2024-06-10 50 20 2024-06-15 C Previous
ID1 2024-06-15 50 50 R Last
ID3 2024-09-01 60 10 2024-09-24 A Previous
ID3 2024-09-03 40 10 2024-09-23 B Previous
ID3 2024-09-22 50 22 R Last
Any help is highly appreciated!
You can do this with window operations, but for this particular problem I feel that joins are more intuitive (and we take advantage of the fact that R only occurs once per ID).
First we get the date of the contracts for banks R in each ID, then join that back to the original, and create the Marker
column:
maxDateDF = sparkDF.filter(F.col('Bank') == 'R').groupby('ID').agg(F.max('ContractDate').alias('RContractDate'))
sparkDF = sparkDF.join(
maxDateDF, on=['ID']
).filter(
(F.col('ContractDate') <= F.col('RContractDate'))
).withColumn(
'Marker', F.when(F.col('Bank') == 'R', 'Last').otherwise('Previous')
)
+---+------------+-------+----+------------+----+-------------+--------+
| ID|ContractDate|Loansum|Debt|MaturityDate|Bank|RContractDate| Marker|
+---+------------+-------+----+------------+----+-------------+--------+
|ID1| 2024-06-01| 100| 10| 2024-06-18| A| 2024-06-15|Previous|
|ID1| 2024-06-05| 50| 20| 2024-06-17| B| 2024-06-15|Previous|
|ID1| 2024-06-10| 50| 20| 2024-06-15| C| 2024-06-15|Previous|
|ID1| 2024-06-10| 90| 70| 2024-06-30| D| 2024-06-15|Previous|
|ID1| 2024-06-15| 50| 50| NULL| R| 2024-06-15| Last|
|ID2| 2024-08-01| 70| 20| 2024-09-01| A| 2024-08-20|Previous|
|ID2| 2024-08-08| 50| 10| 2024-08-26| B| 2024-08-20|Previous|
|ID2| 2024-08-20| 32| 32| NULL| R| 2024-08-20| Last|
|ID3| 2024-09-01| 60| 10| 2024-09-24| A| 2024-09-22|Previous|
|ID3| 2024-09-03| 40| 10| 2024-09-23| B| 2024-09-22|Previous|
|ID3| 2024-09-22| 22| 22| NULL| R| 2024-09-22| Last|
|ID5| 2024-11-01| 70| 20| 2024-11-21| A| 2024-11-20|Previous|
|ID5| 2024-11-08| 50| 10| 2024-11-22| B| 2024-11-20|Previous|
|ID5| 2024-11-20| 40| 40| NULL| R| 2024-11-20| Last|
+---+------------+-------+----+------------+----+-------------+--------+
Notice that the above sparkDF
doesn't satisfy conditions 1 or 2 yet, but is a precursor since we only want records occurring before R.
We start with condition (2) by figuring out which IDs have a loansum no more than 10% higher than the sum of debt given the first condition:
sparkDFcond = sparkDF.filter(
(F.datediff('MaturityDate','RContractDate') <= 5)
).groupby(
'ID'
).agg(
(1.10*F.sum('Debt')).alias('MaxTotalDebt')
)
+---+-----------------+
| ID| MaxTotalDebt|
+---+-----------------+
|ID1|55.00000000000001|
|ID3| 22.0|
|ID5| 33.0|
+---+-----------------+
By joining this back to the earlier sparkDF
, we can figure out which IDs meet condition (2):
validID = sparkDF.join(
sparkDFcond, on=['ID']
).filter(
(F.col('Bank') == 'R') &
(F.col('Loansum') <= F.col('MaxTotalDebt'))
).select('ID')
And then we make sure condition (1) is met by filtering out the appropriate records, and join this with the valid IDs for condition (2):
validDF = sparkDF.filter(
(F.datediff('MaturityDate','RContractDate') <= 5) | (F.col('Bank') == 'R')
).join(validID, on=['ID'])
Final result:
+---+------------+-------+----+------------+----+-------------+--------+
| ID|ContractDate|Loansum|Debt|MaturityDate|Bank|RContractDate| Marker|
+---+------------+-------+----+------------+----+-------------+--------+
|ID1| 2024-06-01| 100| 10| 2024-06-18| A| 2024-06-15|Previous|
|ID1| 2024-06-05| 50| 20| 2024-06-17| B| 2024-06-15|Previous|
|ID1| 2024-06-10| 50| 20| 2024-06-15| C| 2024-06-15|Previous|
|ID1| 2024-06-15| 50| 50| NULL| R| 2024-06-15| Last|
|ID3| 2024-09-01| 60| 10| 2024-09-24| A| 2024-09-22|Previous|
|ID3| 2024-09-03| 40| 10| 2024-09-23| B| 2024-09-22|Previous|
|ID3| 2024-09-22| 22| 22| NULL| R| 2024-09-22| Last|
+---+------------+-------+----+------------+----+-------------+--------+