pysparkapache-spark-sql

Conditional logic in pyspark


I am new to pyspark and am trying to implement row-wise transformations of a large dataframe, based on nested conditional logic. Basically, I need much more complicated versions of the code given below. (Neeper nesting of logic, more columns filled based on the same logic.)

I am interested in alternatives or improvements to the given code, in terms of efficiency and readability. Or should I perhaps use another tool than pyspark?

In particular, I am hoping for help with the the following issues:

This is the type of code I have so far. I tried to use column names and conditions that make at least a bit of sense to a general audience.

from pyspark.sql import functions as F

# Define the transformation logic
sparkDF = sparkDF.withColumn('total_sales',
                             F.when((sparkDF['product_type'].isin(['Electronics', 'Clothing'])) &
                                    (sparkDF['store_location'].isin(['NY', 'LA'])), 
                                    F.when(sparkDF['instore_sales'].isNull(), 0).otherwise(sparkDF['instore_sales']) + 
                                    F.when(sparkDF['online_sales'].isNull(), 0).otherwise(sparkDF['online_sales']))
                              .when((sparkDF['product_type'].isin(['Electronics', 'Clothing'])) &
                                    (~sparkDF['store_location'].isin(['NY', 'LA'])), 
                                    F.when(sparkDF['instore_sales'].isNull(), 0).otherwise(sparkDF['instore_sales']) - 
                                    F.when(sparkDF['returns'].isNull(), 0).otherwise(sparkDF['returns']))
                              .otherwise(None))

sparkDF = sparkDF.withColumn('customer_satisfaction',
                             F.when((sparkDF['product_type'].isin(['Electronics', 'Clothing'])) &
                                    (sparkDF['store_location'].isin(['NY', 'LA'])), 
                                    F.when(sparkDF['positive_reviews'].isNull(), 0).otherwise(sparkDF['positive_reviews']) + 
                                    F.when(sparkDF['neutral_reviews'].isNull(), 0).otherwise(sparkDF['neutral_reviews']))
                              .when((sparkDF['product_type'].isin(['Electronics', 'Clothing'])) &
                                    (~sparkDF['store_location'].isin(['NY', 'LA'])), 
                                    F.when(sparkDF['positive_reviews'].isNull(), 0).otherwise(sparkDF['positive_reviews']) - 
                                    F.when(sparkDF['negative_reviews'].isNull(), 0).otherwise(sparkDF['negative_reviews']))
                              .otherwise(F.when(sparkDF['positive_reviews'].isNull(), 0).otherwise(sparkDF['positive_reviews'])))

Solution

  • I think using F.coalesce will do most of the work for you, but I will add some other steps. In the first step, I will use F.col to reference columns, refactor the filters, and use F.coalesce.

    electronic_or_clothing = F.col('product_type').isin(['Electronics', 'Clothing'])
    in_ny_la = F.col('store_location').isin(['NY', 'LA'])
    
    sparkDF = (
        sparkDF
        .withColumn(
            'total_sales',
            F.when(
                electronic_or_clothing & in_ny_la,
                F.coalesce(F.col('instore_sales'), F.lit(0)) +
                F.coalesce(F.col('online_sales'), F.lit(0))
            ).when(
                electronic_or_clothing & (~in_ny_la),
                F.coalesce(F.col('instore_sales'), F.lit(0)) -
                F.coalesce(F.col('returns'), F.lit(0))
            )
            .otherwise(None)
        )
        .withColumn(
            'customer_satisfaction',
            F.when(
                electronic_or_clothing & in_ny_la,
                F.coalesce(F.col('positive_reviews'), F.lit(0)) +
                F.coalesce(F.col('neutral_reviews'), F.lit(0))
            ).when(
                electronic_or_clothing & (~in_ny_la),
                F.coalesce(F.col('positive_reviews'), F.lit(0)) -
                F.coalesce(F.col('negative_reviews'), F.lit(0)))
            .otherwise(
                F.coalesce(F.col('positive_reviews'), F.lit(0))
            )
        )
    )
    

    If you are going to use the same default value for each column you can do it once and avoid repeating it.

    electronic_or_clothing = F.col('product_type').isin(['Electronics', 'Clothing'])
    in_ny_la = F.col('store_location').isin(['NY', 'LA'])
    
    sparkDF = (
        sparkDF
        .withColumn('instore_sales', F.coalesce(F.col('instore_sales'), F.lit(0)))
        .withColumn('online_sales', F.coalesce(F.col('online_sales'), F.lit(0)))
        .withColumn('returns', F.coalesce(F.col('returns'), F.lit(0)))
        .withColumn('positive_reviews', F.coalesce(F.col('positive_reviews'), F.lit(0)))
        .withColumn('neutral_reviews', F.coalesce(F.col('neutral_reviews'), F.lit(0)))
        .withColumn('negative_reviews', F.coalesce(F.col('negative_reviews'), F.lit(0)))
        .withColumn(
            'total_sales',
            F.when(electronic_or_clothing & in_ny_la, F.col('instore_sales') + F.col('online_sales'))
            .when(electronic_or_clothing & (~in_ny_la), F.col('instore_sales') - F.col('returns'))
            .otherwise(None)
        )
        .withColumn(
            'customer_satisfaction',
            F.when(electronic_or_clothing & in_ny_la, F.col('positive_reviews') + F.col('neutral_reviews'))
            .when(electronic_or_clothing & (~in_ny_la), F.col('positive_reviews') - F.col('negative_reviews'))
            .otherwise(F.col('positive_reviews'))
        )
    )
    

    Revise the conditions and how you use them.

    This depends on the conditions. You may have more conditions, but to keep the question short, you didn't provide them, and this is the best combination.

    electronic_or_clothing = F.col('product_type').isin(['Electronics', 'Clothing'])
    in_ny_la = F.col('store_location').isin(['NY', 'LA'])
    
    sparkDF = (
        sparkDF
        .withColumn('instore_sales', F.coalesce(F.col('instore_sales'), F.lit(0)))
        .withColumn('online_sales', F.coalesce(F.col('online_sales'), F.lit(0)))
        .withColumn('returns', F.coalesce(F.col('returns'), F.lit(0)))
        .withColumn('positive_reviews', F.coalesce(F.col('positive_reviews'), F.lit(0)))
        .withColumn('neutral_reviews', F.coalesce(F.col('neutral_reviews'), F.lit(0)))
        .withColumn('negative_reviews', F.coalesce(F.col('negative_reviews'), F.lit(0)))
        .withColumn(
            'total_sales',
            F.when(
                electronic_or_clothing,
                F.when(in_ny_la, F.col('instore_sales') + F.col('online_sales'))
                .otherwise(F.col('instore_sales') - F.col('returns'))
            ).otherwise(None)
        )
        .withColumn(
            'customer_satisfaction',
            F.when(
                electronic_or_clothing,
                F.when(in_ny_la, F.col('positive_reviews') + F.col('neutral_reviews'))
                .otherwise(F.col('positive_reviews') - F.col('negative_reviews'))
            ).otherwise(F.col('positive_reviews'))
        )
    )