pysparkapache-spark-sqlaws-glue

Spark HashAggregate swapping columns in sparkSQL


I have an AWS Glue job that uses Spark SQL to join two data frames. The job ran correctly without issue weekly for 6 months, and then suddenly the join started swapping the values in the resulting dataset. The schemas for the two dataframes did not change. The results have been swapped (always in the same order) for the last 6 months during the weekly runs.

This is happening in all 7 of my different spark joins in my Glue job.

Using SparkSQL to only "select * from df_view" does not mix up columns. Spark mixes up the result of the join using both sparkSQL and the .join method using the spark API.

Looking at the Query plan, the swap appears to take place in the step 7 HashAggregate. Anyone know why this happened and how to resolve?

counts_df.createOrReplaceTempView("counts_vw")
logger.info(counts_df.printSchema())
renamed_df2.createOrReplaceTempView("tmp_vw")
logger.info(renamed_df2.printSchema())

#pull in the var_index field
spark_df = spark.sql("""select tmp_vw.*, counts_vw.var_index
                        from tmp_vw, counts_vw
                        where tmp_vw.prod_id = counts_vw.prod_id
                            and tmp_vw.var_sku = counts_vw.var_sku
                        order by prod_id, var_sku, run_date
                    """)

logger.info(spark_df.explain('FORMATTED'))

output:

root
 |-- prod_id: string (nullable = true)
 |-- var_id: string (nullable = true)
 |-- var_sku: string (nullable = true)
 |-- var_public_title: string (nullable = true)
 |-- var_index: string (nullable = true)

root
 |-- run_date: string (nullable = true)
 |-- prod_id: string (nullable = true)
 |-- prod_vendor: string (nullable = true)
 |-- prod_type: string (nullable = true)
 |-- var_id: string (nullable = true)
 |-- var_price: string (nullable = true)
 |-- var_name: string (nullable = true)
 |-- var_public_title: string (nullable = true)
 |-- var_sku: string (nullable = true)
 |-- url: string (nullable = true)

== Physical Plan ==
AdaptiveSparkPlan (21)
+- Sort (20)
   +- Exchange (19)
      +- Project (18)
         +- SortMergeJoin Inner (17)
            :- Sort (11)
            :  +- Exchange (10)
            :     +- HashAggregate (9)
            :        +- Exchange (8)
            :           +- HashAggregate (7)
            :              +- Union (6)
            :                 :- Filter (2)
            :                 :  +- Scan ExistingRDD (1)
            :                 +- Project (5)
            :                    +- Filter (4)
            :                       +- Scan ExistingRDD (3)
            +- Sort (16)
               +- Exchange (15)
                  +- Project (14)
                     +- Filter (13)
                        +- Scan ExistingRDD (12)

(1) Scan ExistingRDD
Output [10]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#211]
Arguments: [run_date#202, prod_id#203, prod_vendor#204, prod_(2) Filter
Input [10]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#211]
Condition : (isnotnull(prod_id#203) AND isnotnull(var_sku#210))

(3) Scan ExistingRDD
Output [10]: [run_date#222, prod_id#223L, prod_vendor#224, prod_type#225, var_id#226L, var_price#227, var_name#228, var_public_title#229, var_sku#230, url#231]
Arguments: [run_date#222, prod_id#223L, prod_vendor#224, prod_type#225, var_id#226L, var_price#227, var_name#228, var_public_title#229, var_sku#230, url#231], MapPartitionsRDD[78] at map at DynamicFrame.scala:384, ExistingRDD, UnknownPartitioning(0)

(4) Filter
Input [10]: [run_date#222, prod_id#223L, prod_vendor#224, prod_type#225, var_id#226L, var_price#227, var_name#228, var_public_title#229, var_sku#230, url#231]
Condition : (isnotnull(cast(prod_id#223L as string)) AND isnotnull(var_sku#230))

(5) Project
Output [10]: [run_date#222, cast(prod_id#223L as string) AS prod_id#253, prod_vendor#224, prod_type#225, cast(var_id#226L as string) AS var_id#254, cast(var_price#227 as string) AS var_price#255, var_name#228, var_public_title#229, var_sku#230, substring(url#231, 44, 200) AS url#242]
Input [10]: [run_date#222, prod_id#223L, prod_vendor#224, prod_type#225, var_id#226L, var_price#227, var_name#228, var_public_title#229, var_sku#230, url#231]

(6) Union

(7) HashAggregate
Input [10]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#211]
Keys [10]: [prod_id#203, var_public_title#209, prod_vendor#204, var_sku#210, url#211, run_date#202, var_id#206, prod_type#205, var_name#208, var_price#207]
Functions: []
Aggregate Attributes: []
Results [10]: [prod_id#203, var_public_title#209, prod_vendor#204, var_sku#210, url#211, run_date#202, var_id#206, prod_type#205, var_name#208, var_price#207]

(8) Exchange
Input [10]: [prod_id#203, var_public_title#209, prod_vendor#204, var_sku#210, url#211, run_date#202, var_id#206, prod_type#205, var_name#208, var_price#207]
Arguments: hashpartitioning(prod_id#203, var_public_title#209, prod_vendor#204, var_sku#210, url#211, run_date#202, var_id#206, prod_type#205, var_name#208, var_price#207, 8), ENSURE_REQUIREMENTS, [id=#723]

(9) HashAggregate
Input [10]: [prod_id#203, var_public_title#209, prod_vendor#204, var_sku#210, url#211, run_date#202, var_id#206, prod_type#205, var_name#208, var_price#207]
Keys [10]: [prod_id#203, var_public_title#209, prod_vendor#204, var_sku#210, url#211, run_date#202, var_id#206, prod_type#205, var_name#208, var_price#207]
Functions: []
Aggregate Attributes: []
Results [10]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, translate(url#211, ?,, ) AS url#266]

(10) Exchange
Input [10]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#266]
Arguments: hashpartitioning(prod_id#203, var_sku#210, 8), ENSURE_REQUIREMENTS, [id=#727]

(11) Sort
Input [10]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#266]
Arguments: [prod_id#203 ASC NULLS FIRST, var_sku#210 ASC NULLS FIRST], false, 0

(12) Scan ExistingRDD
Output [5]: [prod_id#321, var_id#322, var_sku#323, var_public_title#324, var_index#325]
Arguments: [prod_id#321, var_id#322, var_sku#323, var_public_title#324, var_index#325], MapPartitionsRDD[137] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(13) Filter
Input [5]: [prod_id#321, var_id#322, var_sku#323, var_public_title#324, var_index#325]
Condition : (isnotnull(prod_id#321) AND isnotnull(var_sku#323))

(14) Project
Output [3]: [prod_id#321, var_sku#323, var_index#325]
Input [5]: [prod_id#321, var_id#322, var_sku#323, var_public_title#324, var_index#325]

(15) Exchange
Input [3]: [prod_id#321, var_sku#323, var_index#325]
Arguments: hashpartitioning(prod_id#321, var_sku#323, 8), ENSURE_REQUIREMENTS, [id=#728]

(16) Sort
Input [3]: [prod_id#321, var_sku#323, var_index#325]
Arguments: [prod_id#321 ASC NULLS FIRST, var_sku#323 ASC NULLS FIRST], false, 0

(17) SortMergeJoin
Left keys [2]: [prod_id#203, var_sku#210]
Right keys [2]: [prod_id#321, var_sku#323]
Join condition: None

(18) Project
Output [11]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#266, var_index#325]
Input [13]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#266, prod_id#321, var_sku#323, var_index#325]

(19) Exchange
Input [11]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#266, var_index#325]
Arguments: rangepartitioning(prod_id#203 ASC NULLS FIRST, var_sku#210 ASC NULLS FIRST, run_date#202 ASC NULLS FIRST, 8), ENSURE_REQUIREMENTS, [id=#734]

(20) Sort
Input [11]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#266, var_index#325]
Arguments: [prod_id#203 ASC NULLS FIRST, var_sku#210 ASC NULLS FIRST, run_date#202 ASC NULLS FIRST], true, 0

(21) AdaptiveSparkPlan
Output [11]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#266, var_index#325]
Arguments: isFinalPlan=false

The resulting dataframe has incorrect values (from other rows in the input tables), and mismatched fields:

  1. The run_date field has the value of var_sku for the wrong join record
  2. The prod_id field has the run_date value for the correct join
  3. The prod_vendor field has the value of the section field for the correct join
  4. the prod_type field has the prod_id value of the field for the correct join etc....

This is a mess. Can anyone help me understand what's happening? Thank you!

I have tried the same join with sparkSQL and the spark join API - same result. Instead of using "select *", I have manually entered each column name - same result. In AWS glue I have cloned the job, changed the Glue version, and spark version. Nothing helps.

----- Update ------ I'm also seeing that if I use pandas instead of spark, columns are in the wrong place.

I'm also seeing that the project step of my query plan has issues:

(1) Scan ExistingRDD
Output [10]: [run_date#266, prod_id#267, prod_vendor#268, prod_type#269, var_id#270, var_price#271, var_name#272, var_public_title#273, var_sku#274, url#275]
Arguments: [run_date#266, prod_id#267, prod_vendor#268, prod_type#269, var_id#270, var_price#271, var_name#272, var_public_title#273, var_sku#274, url#275], MapPartitionsRDD[80] at map at DynamicFrame.scala:374, ExistingRDD, UnknownPartitioning(0)

(2) Project
Output [4]: [prod_id#267, var_id#270, var_sku#274, var_public_title#273]
Input [10]: [run_date#266, prod_id#267, prod_vendor#268, prod_type#269, var_id#270, var_price#271, var_name#272, var_public_title#273, var_sku#274, url#275]


Solution

  • FYI this issue was resolved by removing the "optimizePerformance": True setting in the csv reader:

    print("Pulling data from S3 extract") 
    s3_extract_dyf= glueContext.create_dynamic_frame.from_options(
       format_options={ "quoteChar": '"', 
                       "withHeader": True, 
                       "separator": ",", 
                       "optimizePerformance": True, <<<<<<<<<< remove this line!
                    },
      connection_type="s3", 
      format="csv", 
      connection_options={"paths": ["s3://xxxxxxxx/"], "recurse": True},
      transformation_ctx="s3_extract_dyf", )