pythonpysparkazure-databrickscoalesce

Generic coalesce of multiple columns in join pyspark


I have to merge many spark DataFrames. After the merge, I want to perform a coalesce between multiple columns with the same names.

I was able to create a minimal example following this question.

However, I need a more generic piece of code to support: a set of variables to coalesce (in the example set_vars = set(('var1','var2'))), and multiple join keys (in the example join_keys = set(('id'))).

Is there a less verbose (more generic) way to obtain this result in pyspark?

df1 =  spark.createDataFrame([ 
        ( 1, None ,  "aa"),
        ( 2 , "a", None ),
        ( 3 , "b",  None),
        ( 4 , "h",  None),],
        "id int, var1 string, var2 string",
       )

df2 =  spark.createDataFrame([ 
        ( 1, "f" ,  "Ba"),
        ( 2 , "a", "bb" ),
        ( 3 , "b",  None),],
        "id int, var1 string, var2 string",
       )

df1 = df1.alias("df1")
df2 = df2.alias("df2")

df3 = df1.join(df2, df1.id == df2.id, how='left').withColumn("var1_", coalesce("df1.var1", "df2.var1")).drop("var1").withColumnRenamed("var1_", "var1").withColumn("var2_", coalesce("df1.var2", "df2.var2")).drop("var2").withColumnRenamed("var2_", "var2")

Solution

  • We can avoid duplicate columns by passing columns as a list to join method instead of writing joining condition, refer this link. But here there are some common columns which are not required for joining condition. we can use for loop to generalize your code.

    spark = SparkSession.builder.master("local[*]").getOrCreate()
    
    df1 =  spark.createDataFrame([
            ( 1, None ,  "aa"),
            ( 2 , "a", None ),
            ( 3 , "b",  None),
            ( 4 , "h",  None),],
            "id int, var1 string, var2 string",
           )
    
    df2 =  spark.createDataFrame([
            ( 1, "f" ,  "Ba"),
            ( 2 , "a", "bb" ),
            ( 3 , "b",  None),],
            "id int, var1 string, var2 string",
           )
    
    df1 = df1.alias("df1")
    df2 = df2.alias("df2")
    
    key_columns = ["id"]
    # Get common columns between 2 dataframes excluding columns-
    # -which are being used in joining conditions
    other_common_columns = set(df1.columns).intersection(set(df2.columns))\
    .difference(set(key_columns))
    
    outputDF = df1.join(df2, key_columns, how='left')
    
    for i in other_common_columns:
        outputDF = outputDF.withColumn(f"{i}_", coalesce(f"df1.{i}", f"df2.{i}"))\
    .drop(i).withColumnRenamed(f"{i}_", i)
    
    outputDF.show()
    
    +---+----+----+
    | id|var2|var1|
    +---+----+----+
    |  1|  aa|   f|
    |  3|null|   b|
    |  4|null|   h|
    |  2|  bb|   a|
    +---+----+----+