I am trying to read data from parquet file in blob storage in databricks and writing to a delta table.
Cluster config = 14.3 LTS (includes Apache Spark 3.5.0, Scala 2.12)
1.df = spark.read.format("parquet").load("/mnt/path") -- Reading successfully 2.df.write.format("delta").mode("overwrite").saveAsTable(path)
Here giving this error SchemaColumnConvertNotSupportedException: column: [Col_Name], physicalType: INT64, logicalType: string
I have tried getting the schema from parquet and enforce it while reading it but still i am getting the error. Tried different spark conf setting but no result.
# Extracting the schema
myschema = spark.read.option("header","true").option("recursiveFileLookup","true").parquet("/mnt/path").schema
print(myschema)
StructType([StructField('GatewayID', StringType(), True), StructField('Version', StringType(), True), StructField('Generation', StringType(), True), StructField('AppVersion', StringType(), True), StructField('UnitEntity', StringType(), True), StructField('SubID', StringType(), True), StructField('SIMCardID', StringType(), True), StructField('UnitNumber', StringType(), True), StructField('UnitType', StringType(), True), StructField('ISOCountryCode', StringType(), True), StructField('ReportTime', LongType(), True), StructField('MessageFormat', StringType(), True), StructField('MessagesAsString', StringType(), True)])
# Imposing the schema
df = spark.read.format("parquet").schema(myschema).load("/mnt/path")
# Writing to datalake
df.write.format("delta").mode("overwrite").saveAsTable(path)
Error:
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException: column: [SubID], physicalType: INT64, logicalType: string
Analysis:
If you see the subID col. is string while extracting the schema but in target parquet as per error it is INT64.
I have tried to convert the datatype in dataframe and write to delta but same schema error.
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
df = df.withColumn("SubID", col("SubID").cast(IntegerType()))
As per this article by @shanmugavel.chandrakasu,
From Databricks 7.3 and above, when reading the parquet files, spark will read the files in vectorized format. This might be the reason that it is taking the native data type
string
(data type taken when reading parquet file) instead of casting data typeint
.
First, try to disable the vectorized format and then read the parquet file from source.
spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")
Now, read the parquet file and check whether the data types of source and target columns are same or not.
If not, use below code to cast it to the INT64
data type. It is mentioned that your target has INT64
which is long
, so use the LongType
while casting.
from pyspark.sql.types import LongType
df = df.withColumn("SubID", col("SubID").cast(LongType()))