Fix
Issue was due to mismatched data types. Explicitly declaring schema type resolved the issue.
schema = StructType([
StructField("_id", StringType(), True),
StructField("department_id", IntegerType(), True),
StructField("first_name", StringType(), True),
StructField("id", IntegerType(), True),
StructField("last_name", StringType(), True),
StructField("salary", IntegerType(), True)])
Original Post
Resubmitting with some edits as old one was flagged for not being focused enough
New to Pyspark and parquet/delta ecosystem. I'm trying to write a script (using Pyspark) that does the following
I am able to do till step 2 but steps 3 and 4 error out. Any idea -
Script
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta import *
from delta.tables import *
from pyspark.sql.types import *
from pyspark import SparkFiles
from pyspark.context import SparkContext
print("Kernel:", sys.executable)
print("Python version:", sys.version)
print("Spark version:", spark.version)
print("PySpark version:", pyspark.__version__)
print("PySpark Version :", spark.sparkContext.version)
spark = (
SparkSession.builder.master("local[*]")
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.1.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.getOrCreate()
)
delta_table_path = "output/data_delta_table"
.......
.......
# save to delta format file (overwrite if exists)
spark_dataframe_parq.write.mode(saveMode="overwrite").format("delta").save(delta_table_path)
# read delta table
deltaTable = DeltaTable.forPath(spark, delta_table_path)
# check table details
print ("Delta Table details: ", deltaTable.detail())
# check version before
print ("Delta Table Version Before : " , deltaTable.version()) # <<< Issue 1
# Add new spark dataframe and append to delta table
schema = ("_id","department_id","first_name","id","last_name","salary")
data = [("6407c350840f10d7f3e769f8",1500,"Justin",2002,"Simon",100300)]
new_df = spark.createDataFrame(data, schema)
new_df.write.format("delta").mode("append").save(delta_table_path) # <<< Issue 2
Output
Python version: 3.9.6 (default, Mar 29 2024, 10:51:09)
[Clang 15.0.0 (clang-1500.3.9.4)]
Spark version: 3.5.1
PySpark version: 3.5.1
PySpark Version : 3.5.1
...
...
...
Delta Table details : DataFrame[format: string, id: string, name: string, description: string, location: string, createdAt: timestamp, lastModified: timestamp, partitionColumns: array<string>, clusteringColumns: array<string>, numFiles: bigint, sizeInBytes: bigint, properties: map<string,string>, minReaderVersion: int, minWriterVersion: int, tableFeatures: array<string>]
Traceback (most recent call last):
File "/Users/foobar/workspace/practice/deltalake/parquet_delta_example_using_spark.py", line 151, in <module>
print ("Delta Table Version Before : " , deltaTable.version())
AttributeError: 'DeltaTable' object has no attribute 'version'
...
...
Traceback (most recent call last):
File "/Users/foobar/workspace/practice/deltalake/parquet_delta_example_using_spark.py", line 157, in <module>
new_df.write.format("delta").mode("append").save(delta_table_path)
File "/Users/foobar/Library/Python/3.9/lib/python/site-packages/pyspark/sql/readwriter.py", line 1463, in save
self._jwrite.save(path)
File "/Users/foobar/Library/Python/3.9/lib/python/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/Users/foobar/Library/Python/3.9/lib/python/site-packages/pyspark/errors/exceptions/captured.py", line 185, in deco
raise converted from None
pyspark.errors.exceptions.captured.AnalysisException: [DELTA_FAILED_TO_MERGE_FIELDS] Failed to merge fields 'department_id' and 'department_id'
It seems your schema is incompatible, try maybe to validate if department_id type in original (saved) delta table aligns with the type from the dataframe you want to append
.
See similar issue here.