dataframepysparkcomparison

PySpark DataFrame - Compare multiple Dataframes' Columns with Serial Number Suffix


I am trying to solve a problem but not getting anywhere. Needed help from community.

I can join n number of data frames (all joined dataframes are having same schema). While joining I renamed the columns to add a serial number suffix. The data looks like this -

+------------------------------------+-----------+-----------+...+-----------+-----------+...+-----------+-----------+......+-----------+-----------+
|primary_key                         |Test_Col1_0|Test_Col2_0|...|Test_Col1_1|Test_Col2_1|...|Test_Col1_2|Test_Col2_2|......|Test_Col1_n|Test_Col2_n|
+------------------------------------+-----------+-----------+...+-----------+-----------+...+-----------+-----------+......+-----------+-----------+
|{10002711957914718:0000000015945968}|XYZ        |ABC        |...|PQR        |MNO        |...|JKL        |DEF        |......|SRT        |TUV        |
|{10014135598021497:0000000213113137}|XYZ        |ABC        |...|XYZ        |MNO        |...|XYZ        |DEF        |......|XYZ        |TUV        |
:
:
+------------------------------------+-----------+-----------+...+-----------+-----------+...+-----------+-----------+......+-----------+-----------+

I want to compare all the corresponding columns (Test_Col1_0, Test_Col1_1, Test_Col1_2,...., Test_Col1_n) and generate below type of normalized result. The record in this result should only generate if any of values from corresponding columns is not matching (for eg. record for Test_Col1 is created for first primary key but not in case of second primary key in the example data) -

+------------------------------------+-------------+-----------------------------------------------------+
|primary_key                         |Diff_Col_Name|Diff_Col_Values                                      |
+------------------------------------+-------------+-----------------------------------------------------+
|{10002711957914718:0000000015945968}|Test_Col1    |{Src_1:XYZ}{Src_2:PQR}{Src_3:JKL}......{Src_n:SRT}   |
|{10002711957914718:0000000015945968}|Test_Col2    |{Src_1:ABC}{Src_2:MNO}{Src_3:DEF}......{Src_n:TUV}   |
|{10002711957914718:0000000015945968}|   :
|{10002711957914718:0000000015945968}|Test_ColX    |{Src_1:...}{Src_2:...}{Src_3:...}......{Src_n:...}   |

|{10014135598021497:0000000213113137}|Test_Col2    |{Src_1:ABC}{Src_2:MNO}{Src_3:DEF}......{Src_n:TUV}   |
|{10014135598021497:0000000213113137}|   :
|{10014135598021497:0000000213113137}|Test_ColX    |{Src_1:...}{Src_2:...}{Src_3:...}......{Src_n:...}   |

I thought to create columns (say Text_Col1, Text_Col2, ..., Test_ColX) with value consisting of concatenated values from all corresponding columns so that then I can do dedup on the list and to see if the length of list is more than 1 (which will denote that we have few columns that have different value then remaining corresponding columns). But withColumns method is not working in my servers. (I get error - AttributeError: 'DataFrame' object has no attribute 'withColumns')

Any solution without use of 'withColumns' will be very helpful. Please help.


Solution

  • Thanks for suggestions and tips. I was able to solve the problem last week and it is working fine. Below is the step by step solution (modifed to remove internal information). Please let me know if I can optimise the peformance on this in anyways. It will be very helpful.

    Step 1 - Since all the data frames to be joined will have same column names. So, to diferentiate them, we have to rename the columns by appending a serial number to it.

    i = 0
    src1FormatDataDF = src1DataDF.select('primary_key', *(col(x).alias(x + str(i)) for x in src1DataDF.columns if x != 'primary_key'))
    i = 1
    src2FormatDataDF = src2DataDF.select('primary_key', *(col(x).alias(x + str(i)) for x in src2DataDF.columns if x != 'primary_key'))
    i = 2
    src3FormatDataDF = src3DataDF.select('primary_key', *(col(x).alias(x + str(i)) for x in src3DataDF.columns if x != 'primary_key'))
    

    Step 2 - Join all the dataframes.

    joinedDataDF = src1FormatDataDF.join(src2FormatDataDF, "primary_key").join(src3FormatDataDF, "primary_key")
    

    Step 3 - Collate the data from all the corresponding columns into single column.

    for c in (x for x in src1DataDF.columns if x != 'primary_key'):
        joinedDataDF = joinedDataDF.withColumn(c,(array(*((c+str(i)) for i in range(num)))))
    

    Step 4 - Drop all the other columns apart from collated ones.

    joinedCollatedDataDF = joinedDataDF.select(col("primary_key").alias("primary_key"), *(c for c in (x for x in src1DataDF.columns if x != 'primary_key')))
    

    Step 5 - Condition for selecting data values where size of distinct values in a collated column is more than 1 indicating that some differences are present.

    diffColCondition_ = [when(size(array_distinct(c)) > 1, lit(c)).otherwise("") for c in joinedCollatedDataDF.columns if c != 'primary_key']
    diffValCondition_ = [when(size(array_distinct(c)) > 1, format_string('%s', joinedCollatedDataDF[c])).otherwise("") for c in joinedCollatedDataDF.columns if c != 'primary_key']
    

    Step 6 - Preparing column list for selection while applying above transformation

    select_expr = [
                col("primary_key").alias("primary_key"),
                array_remove(array(*diffColCondition_), "").alias("diff_columns"),
                array_remove(array(*diffValCondition_), "").alias("diff_columns_value")
            ]
    

    Step 7 - transforming dataframe by combining list of names and values of differing attributes (to maintain granularity during explode) and then exploding them to create different rows for each item from list.

    finalResultDF = joinedCollatedDataDF.select(*select_expr)
                        .withColumn("tmp", arrays_zip("diff_columns", "diff_columns_value"))
                        .withColumn("Result",explode("tmp"))
                        .select(col("primary_key"),col("Result.diff_columns").alias("diff_attribute"), col("Result.diff_columns_value").alias("diff_values"))
    
    Sample output at this stage - 
    
        +------------------------------------+--------------+-----------------+
        |primary_key                         |diff_attribute|diff_values      |
        +------------------------------------+--------------+-----------------+
        |{10002711957914718:0000000015945968}|Test_Col1     |[XYZ,PQR,JKL]|
        |{10002711957914718:0000000015945968}|Test_Col2     |[ABC,MNO,DEF]|
        |{10014135598021497:0000000213113137}|Test_Col1     |[XYZ,PQR,JKL]|
        |{10014135598021497:0000000213113137}|Test_Col2     |[ABC,MNO,DEF]|
        |{10018096228103255:0000000231866368}|Test_Col1     |[XYZ,PQR,JKL]|
        |{10018096228103255:0000000231866368}|Test_Col2     |[ABC,MNO,DEF]|
    

    Step 8 - I took a list which will have the list of source names in the orer in which we read the data frame. I will interleave the values from that to form desired output.

    srcValMap="[SRC1,SRC2,SRC3]"
    
    outputDataDF = finalResultDF.withColumn("srcValMapArray",array(*[lit(w) for w in (map(str, srcValMap.strip('[]').split(',')))]))      # form an array of source names
                        .withColumn("diffValuesArray", split(translate(col("diff_values"),"[]",""),","))         # form an array of values from difference columns
                        .withColumn("src_val_map", to_json(map_from_arrays("srcValMapArray", "diffValuesArray")))       # interleave the arrays of source names and column values and change to string format (replaces -> with :)
                        .select(lit(reconLineageRunId).alias("recon_lineage_run_id"), "primary_key","diff_attribute", "diff_values", "src_val_map")
    
    Sample output - 
    
        +------------------------------------+--------------+------------------------------------------------------+
        |primary_key                         |diff_attribute|src_val_map                                           |
        +------------------------------------+--------------+------------------------------------------------------+
        |{10002711957914718:0000000015945968}|Test_Col1     |{"GCT":"XYZ","ORA2STD":"PQR","MERGE":"JKL","FL":"JKL"}|
        |{10002711957914718:0000000015945968}|Test_Col2     |{"GCT":"ABC","ORA2STD":"MNO","MERGE":"DEF","FL":"ABC"}|
        |{10014135598021497:0000000213113137}|Test_Col1     |{"GCT":"XYZ","ORA2STD":"PQR","MERGE":"JKL","FL":"JKL"}|
        |{10014135598021497:0000000213113137}|Test_Col2     |{"GCT":"ABC","ORA2STD":"MNO","MERGE":"DEF","FL":"ABC"}|
        |{10018096228103255:0000000231866368}|Test_Col1     |{"GCT":"XYZ","ORA2STD":"PQR","MERGE":"JKL","FL":"JKL"}|
        |{10018096228103255:0000000231866368}|Test_Col2     |{"GCT":"ABC","ORA2STD":"MNO","MERGE":"DEF","FL":"ABC"}|