pysparkdatabricks

How to convert DataFrame into an integer in PySpark Databricks?


I'm new to PySpark and currently working in Databricks, comparing two DataFrames with the same column structure. I'm comparing them to each other (essentially comparing a file already loaded into the database against a new one). During the process, I'm calculating the number of changes made to each variable using the code below:

Comparison_DF = DF1_Data_To_Compare.withColumn("Value1_Change", when(col("b.Value1") == col("a.Value1"), 0).otherwise(1))
Comparison_DF = Comparison_DF.withColumn("Value2_Change", when(col("b.Value2") == col("a.Value2"), 0).otherwise(1))  

# Summarizing the number of changes
Change_To_Value1 = Comparison_DF.select(sum("Value1_Change"))
Change_To_Value2 = Comparison_DF.select(sum("Value2_Change"))

# Forming the change report DataFrame
# columns=["Type of Change", "Number of Occurrences"]
data = [("Change to Value1", Change_To_Value1), ("Change to Value2", Change_To_Value2)]

rdd = spark.sparkContext.parallelize(data)
print(data) 

The line rdd = spark.sparkContext.parallelize(data) returns an error. After examining the error traceback, I realized that Change_To_Value1 and Change_To_Value2 are not variables but DataFrames. The print(data) statement gives the following result: [('Change to Value1', DataFrame[sum(Value1_Change): bigint]), ('Change to Value2', DataFrame[sum(Value2_Change): bigint])].

I need to form this kind of DataFrame to use it as a Change Report, for comparison with results returned by an SSIS package.

I haven't found anything similar on StackOverflow or any other open source. I tried to build a loop statement to collect these DataFrames and feed them into a new one directly, but I failed at that as well.

Is there a way to convert these DataFrames into int variables? Or is there a better way to form this DataFrame?


Solution

  • If I understand correctly, you want to create a DataFrame that looks like this:

    Type of Change Number of Occurrences
    Change to Value1 xxx
    Change to Value2 yyy

    Here's how I would approach it:

    # Keep the first two lines as is:
    Comparison_DF = DF1_Data_To_Compare.withColumn("Value1_Change", when(col("b.Value1") == col("a.Value1"), 0).otherwise(1))
    Comparison_DF = Comparison_DF.withColumn("Value2_Change", when(col("b.Value2") == col("a.Value2"), 0).otherwise(1))  
    
    # Then, modify the next part slightly:
    Change_To_Value1 = Comparison_DF.select(
        F.lit("Change to Value1").alias("Type of Change"),
        sum("Value1_Change").alias("Number of Occurrences"),
    )
    Change_To_Value2 = Comparison_DF.select(
        F.lit("Change to Value2").alias("Type of Change"),
        sum("Value2_Change").alias("Number of Occurrences"),
    )
    
    # Combine the two DataFrames
    data = Change_To_Value1.unionByName(Change_To_Value2)
    
    # Additionally, if you need the data as an RDD (though not recommended):
    data.rdd
    

    This approach should give you the desired DataFrame structure. Note that using RDDs is generally not recommended unless necessary, as DataFrames are more optimized for PySpark operations.