Using autoloader, I am reading some continues data from storage to Databricks Delta Live table. The declaration of data pipeline is as follows.
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
sch = "StructType([StructField('Date', StringType(), True), StructField('machine', StringType(), True), StructField('temperature', DecimalType(), True), StructField('time', StringType(), True)])"
@dlt.create_table(
comment="The raw machine data, ingested from azure storage.",
table_properties={
"myCompanyPipeline.quality": "raw",
"pipelines.autoOptimize.managed": "true"
}
)
def test_raw():
return (spark.readStream.format("cloudFiles").option("schema",sch).option("cloudFiles.schemaLocation", "/FileStore/schema").option("cloudFiles.format", "json").load("..../"))
And dataset I am reading from storage as below.
{"Date":"2023-10-16","time":"12:00:00","machine":"Machine1","temperature":"23.50"}
{"Date":"2023-10-16","time":"12:00:01","machine":"Machine2","temperature":"...corrupt temp..."}
{"Date":"2023-10-16","time":"12:00:02","machine":"Machine3","temperature":"27.50"}
But unfortunately, the pipeline is not failing for wrong "temperature" data (Non Decimal) and pipeline is processing all records successfully. Ideally this should get failed because temperature column is defined as Decimal data type.
Can someone please help, why this schema enforcement not working.
The problem has been resolved after applying
spark.readStream.format("cloudFiles").schema(sch)
in place of
spark.readStream.format("cloudFiles").option("schema",sch)