pysparkdelta

Adding a dataframe to an existing delta table throws DELTA_FAILED_TO_MERGE_FIELDS error


Fix

Issue was due to mismatched data types. Explicitly declaring schema type resolved the issue.

schema = StructType([
   StructField("_id", StringType(), True),
   StructField("department_id", IntegerType(), True),
   StructField("first_name", StringType(), True),
   StructField("id", IntegerType(), True),
   StructField("last_name", StringType(), True),
   StructField("salary", IntegerType(), True)])

Original Post

Resubmitting with some edits as old one was flagged for not being focused enough

New to Pyspark and parquet/delta ecosystem. I'm trying to write a script (using Pyspark) that does the following

  1. Save a parquet file in delta table format.
  2. Create a delta table object on top of that file.
  3. Add record to table.
  4. Check table version after operation.

I am able to do till step 2 but steps 3 and 4 error out. Any idea -

Script

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta import *
from delta.tables import *
from pyspark.sql.types import *

from pyspark import SparkFiles
from pyspark.context import SparkContext

print("Kernel:", sys.executable)
print("Python version:", sys.version)
print("Spark version:", spark.version)
print("PySpark version:", pyspark.__version__)
print("PySpark Version :", spark.sparkContext.version)
    
spark = (
    SparkSession.builder.master("local[*]")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.1.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .getOrCreate()
)

delta_table_path = "output/data_delta_table"

.......
.......
        
    # save to delta format file (overwrite if exists)
spark_dataframe_parq.write.mode(saveMode="overwrite").format("delta").save(delta_table_path)
        
    # read delta table
    deltaTable = DeltaTable.forPath(spark, delta_table_path)    
    
    # check table details 
    print ("Delta Table details: ", deltaTable.detail())
    
    # check version before
    print ("Delta Table Version Before : " , deltaTable.version()) # <<< Issue 1

   
    # Add new spark dataframe and append to delta table 
    schema = ("_id","department_id","first_name","id","last_name","salary")
    data = [("6407c350840f10d7f3e769f8",1500,"Justin",2002,"Simon",100300)]
    new_df = spark.createDataFrame(data, schema)

    new_df.write.format("delta").mode("append").save(delta_table_path) # <<< Issue 2

Output

Python version: 3.9.6 (default, Mar 29 2024, 10:51:09)
[Clang 15.0.0 (clang-1500.3.9.4)]
Spark version: 3.5.1
PySpark version: 3.5.1
PySpark Version : 3.5.1
...
...
...
Delta Table details :  DataFrame[format: string, id: string, name: string, description: string, location: string, createdAt: timestamp, lastModified: timestamp, partitionColumns: array<string>, clusteringColumns: array<string>, numFiles: bigint, sizeInBytes: bigint, properties: map<string,string>, minReaderVersion: int, minWriterVersion: int, tableFeatures: array<string>]
Traceback (most recent call last):
File "/Users/foobar/workspace/practice/deltalake/parquet_delta_example_using_spark.py", line 151, in <module>
print ("Delta Table Version Before : " , deltaTable.version())
AttributeError: 'DeltaTable' object has no attribute 'version'
...
...
Traceback (most recent call last):
File "/Users/foobar/workspace/practice/deltalake/parquet_delta_example_using_spark.py", line 157, in <module>
new_df.write.format("delta").mode("append").save(delta_table_path)
File "/Users/foobar/Library/Python/3.9/lib/python/site-packages/pyspark/sql/readwriter.py", line 1463, in save
self._jwrite.save(path)
File "/Users/foobar/Library/Python/3.9/lib/python/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/Users/foobar/Library/Python/3.9/lib/python/site-packages/pyspark/errors/exceptions/captured.py", line 185, in deco
raise converted from None
pyspark.errors.exceptions.captured.AnalysisException: [DELTA_FAILED_TO_MERGE_FIELDS] Failed to merge fields 'department_id' and 'department_id'

Solution

  • It seems your schema is incompatible, try maybe to validate if department_id type in original (saved) delta table aligns with the type from the dataframe you want to append. See similar issue here.