I'm new to PySpark and currently working in Databricks, comparing two DataFrames with the same column structure. I'm comparing them to each other (essentially comparing a file already loaded into the database against a new one). During the process, I'm calculating the number of changes made to each variable using the code below:
Comparison_DF = DF1_Data_To_Compare.withColumn("Value1_Change", when(col("b.Value1") == col("a.Value1"), 0).otherwise(1))
Comparison_DF = Comparison_DF.withColumn("Value2_Change", when(col("b.Value2") == col("a.Value2"), 0).otherwise(1))
# Summarizing the number of changes
Change_To_Value1 = Comparison_DF.select(sum("Value1_Change"))
Change_To_Value2 = Comparison_DF.select(sum("Value2_Change"))
# Forming the change report DataFrame
# columns=["Type of Change", "Number of Occurrences"]
data = [("Change to Value1", Change_To_Value1), ("Change to Value2", Change_To_Value2)]
rdd = spark.sparkContext.parallelize(data)
print(data)
The line rdd = spark.sparkContext.parallelize(data)
returns an error. After examining the error traceback, I realized that Change_To_Value1
and Change_To_Value2
are not variables but DataFrames. The print(data)
statement gives the following result: [('Change to Value1', DataFrame[sum(Value1_Change): bigint]), ('Change to Value2', DataFrame[sum(Value2_Change): bigint])]
.
I need to form this kind of DataFrame to use it as a Change Report, for comparison with results returned by an SSIS package.
I haven't found anything similar on StackOverflow or any other open source. I tried to build a loop statement to collect these DataFrames and feed them into a new one directly, but I failed at that as well.
Is there a way to convert these DataFrames into int variables? Or is there a better way to form this DataFrame?
If I understand correctly, you want to create a DataFrame that looks like this:
Type of Change | Number of Occurrences |
---|---|
Change to Value1 | xxx |
Change to Value2 | yyy |
Here's how I would approach it:
# Keep the first two lines as is:
Comparison_DF = DF1_Data_To_Compare.withColumn("Value1_Change", when(col("b.Value1") == col("a.Value1"), 0).otherwise(1))
Comparison_DF = Comparison_DF.withColumn("Value2_Change", when(col("b.Value2") == col("a.Value2"), 0).otherwise(1))
# Then, modify the next part slightly:
Change_To_Value1 = Comparison_DF.select(
F.lit("Change to Value1").alias("Type of Change"),
sum("Value1_Change").alias("Number of Occurrences"),
)
Change_To_Value2 = Comparison_DF.select(
F.lit("Change to Value2").alias("Type of Change"),
sum("Value2_Change").alias("Number of Occurrences"),
)
# Combine the two DataFrames
data = Change_To_Value1.unionByName(Change_To_Value2)
# Additionally, if you need the data as an RDD (though not recommended):
data.rdd
This approach should give you the desired DataFrame structure. Note that using RDDs is generally not recommended unless necessary, as DataFrames are more optimized for PySpark operations.