I am new to pyspark and am trying to implement row-wise transformations of a large dataframe, based on nested conditional logic. Basically, I need much more complicated versions of the code given below. (Neeper nesting of logic, more columns filled based on the same logic.)
I am interested in alternatives or improvements to the given code, in terms of efficiency and readability. Or should I perhaps use another tool than pyspark?
In particular, I am hoping for help with the the following issues:
The code looks ugly. If I use this syntax, the end result will be an unreadable nightmare.
I often need to fill multiple columns based on the same block of conditional logic. To this end, I am currently going through the same logic twice (or more). Is there a better way to do this? I suppose I could create an intermediate integer column 'logicOutcome', and then fill the new columns based on only that one column. That sounds wasteful in terms of memory-usage and -access though. I would prefer a way to define multiple columns at once, such as with polars' struct.
I often need to take missing values as 0, or sometimes 1. But I usually cannot just overwrite them in the existing data. Is there a shorter way than writing "when ... isNull() ..." all over the place as below? (In polars for instance, I can at least just write .fill_null(0)
.)
Is there a way to wrap some of this logic into code blocks and include them from elsewhere, for better readability? Preferrably in nicer syntax? For this I could use python functions with pysparks' UDF, but what I tried so far with these was wildly inefficient (switching between parallel spark and sequential python all the time).
For my application, very often none of the logical cases apply and the correct thing to do is nothing. As far as I can see, with the below syntax I would have to write sparkDF.withColumn('myColumn' ... [some complicated logic] ... .otherwise(sparkDF['myColumn'])
. Does spark recognize when it does not have to do anything in such a case, or does it overwrite the value in myColumn with itself? (Does that make much of a difference for efficiency if say in 95% of cases it would have to do nothing?)
This is the type of code I have so far. I tried to use column names and conditions that make at least a bit of sense to a general audience.
from pyspark.sql import functions as F
# Define the transformation logic
sparkDF = sparkDF.withColumn('total_sales',
F.when((sparkDF['product_type'].isin(['Electronics', 'Clothing'])) &
(sparkDF['store_location'].isin(['NY', 'LA'])),
F.when(sparkDF['instore_sales'].isNull(), 0).otherwise(sparkDF['instore_sales']) +
F.when(sparkDF['online_sales'].isNull(), 0).otherwise(sparkDF['online_sales']))
.when((sparkDF['product_type'].isin(['Electronics', 'Clothing'])) &
(~sparkDF['store_location'].isin(['NY', 'LA'])),
F.when(sparkDF['instore_sales'].isNull(), 0).otherwise(sparkDF['instore_sales']) -
F.when(sparkDF['returns'].isNull(), 0).otherwise(sparkDF['returns']))
.otherwise(None))
sparkDF = sparkDF.withColumn('customer_satisfaction',
F.when((sparkDF['product_type'].isin(['Electronics', 'Clothing'])) &
(sparkDF['store_location'].isin(['NY', 'LA'])),
F.when(sparkDF['positive_reviews'].isNull(), 0).otherwise(sparkDF['positive_reviews']) +
F.when(sparkDF['neutral_reviews'].isNull(), 0).otherwise(sparkDF['neutral_reviews']))
.when((sparkDF['product_type'].isin(['Electronics', 'Clothing'])) &
(~sparkDF['store_location'].isin(['NY', 'LA'])),
F.when(sparkDF['positive_reviews'].isNull(), 0).otherwise(sparkDF['positive_reviews']) -
F.when(sparkDF['negative_reviews'].isNull(), 0).otherwise(sparkDF['negative_reviews']))
.otherwise(F.when(sparkDF['positive_reviews'].isNull(), 0).otherwise(sparkDF['positive_reviews'])))
I think using F.coalesce
will do most of the work for you, but I will add some other steps. In the first step, I will use F.col
to reference columns, refactor the filters, and use F.coalesce
.
electronic_or_clothing = F.col('product_type').isin(['Electronics', 'Clothing'])
in_ny_la = F.col('store_location').isin(['NY', 'LA'])
sparkDF = (
sparkDF
.withColumn(
'total_sales',
F.when(
electronic_or_clothing & in_ny_la,
F.coalesce(F.col('instore_sales'), F.lit(0)) +
F.coalesce(F.col('online_sales'), F.lit(0))
).when(
electronic_or_clothing & (~in_ny_la),
F.coalesce(F.col('instore_sales'), F.lit(0)) -
F.coalesce(F.col('returns'), F.lit(0))
)
.otherwise(None)
)
.withColumn(
'customer_satisfaction',
F.when(
electronic_or_clothing & in_ny_la,
F.coalesce(F.col('positive_reviews'), F.lit(0)) +
F.coalesce(F.col('neutral_reviews'), F.lit(0))
).when(
electronic_or_clothing & (~in_ny_la),
F.coalesce(F.col('positive_reviews'), F.lit(0)) -
F.coalesce(F.col('negative_reviews'), F.lit(0)))
.otherwise(
F.coalesce(F.col('positive_reviews'), F.lit(0))
)
)
)
If you are going to use the same default value for each column you can do it once and avoid repeating it.
electronic_or_clothing = F.col('product_type').isin(['Electronics', 'Clothing'])
in_ny_la = F.col('store_location').isin(['NY', 'LA'])
sparkDF = (
sparkDF
.withColumn('instore_sales', F.coalesce(F.col('instore_sales'), F.lit(0)))
.withColumn('online_sales', F.coalesce(F.col('online_sales'), F.lit(0)))
.withColumn('returns', F.coalesce(F.col('returns'), F.lit(0)))
.withColumn('positive_reviews', F.coalesce(F.col('positive_reviews'), F.lit(0)))
.withColumn('neutral_reviews', F.coalesce(F.col('neutral_reviews'), F.lit(0)))
.withColumn('negative_reviews', F.coalesce(F.col('negative_reviews'), F.lit(0)))
.withColumn(
'total_sales',
F.when(electronic_or_clothing & in_ny_la, F.col('instore_sales') + F.col('online_sales'))
.when(electronic_or_clothing & (~in_ny_la), F.col('instore_sales') - F.col('returns'))
.otherwise(None)
)
.withColumn(
'customer_satisfaction',
F.when(electronic_or_clothing & in_ny_la, F.col('positive_reviews') + F.col('neutral_reviews'))
.when(electronic_or_clothing & (~in_ny_la), F.col('positive_reviews') - F.col('negative_reviews'))
.otherwise(F.col('positive_reviews'))
)
)
Revise the conditions and how you use them.
This depends on the conditions. You may have more conditions, but to keep the question short, you didn't provide them, and this is the best combination.
electronic_or_clothing = F.col('product_type').isin(['Electronics', 'Clothing'])
in_ny_la = F.col('store_location').isin(['NY', 'LA'])
sparkDF = (
sparkDF
.withColumn('instore_sales', F.coalesce(F.col('instore_sales'), F.lit(0)))
.withColumn('online_sales', F.coalesce(F.col('online_sales'), F.lit(0)))
.withColumn('returns', F.coalesce(F.col('returns'), F.lit(0)))
.withColumn('positive_reviews', F.coalesce(F.col('positive_reviews'), F.lit(0)))
.withColumn('neutral_reviews', F.coalesce(F.col('neutral_reviews'), F.lit(0)))
.withColumn('negative_reviews', F.coalesce(F.col('negative_reviews'), F.lit(0)))
.withColumn(
'total_sales',
F.when(
electronic_or_clothing,
F.when(in_ny_la, F.col('instore_sales') + F.col('online_sales'))
.otherwise(F.col('instore_sales') - F.col('returns'))
).otherwise(None)
)
.withColumn(
'customer_satisfaction',
F.when(
electronic_or_clothing,
F.when(in_ny_la, F.col('positive_reviews') + F.col('neutral_reviews'))
.otherwise(F.col('positive_reviews') - F.col('negative_reviews'))
).otherwise(F.col('positive_reviews'))
)
)