I'm trying to load a delta live table. The input is some files in Avro (1 files every 15min).
The output is a SCD type 1 delta table in databricks :
here is my code in databricks :
# Databricks notebook source
import dlt
from pyspark.sql import functions as F, Window as W
# COMMAND ----------
schema = (
spark.read.format("avro")
.load(
"/pim_raw_rec/Pimproduct_to_process/"
)
.schema
)
# COMMAND ----------
@dlt.table
def pimproduct_to_process():
return (
spark.readStream.format("avro")
.schema(schema)
.load(
"/Pimproduct_to_process/"
)
)
# COMMAND ----------
dlt.create_streaming_table("pimproduct_delta")
# COMMAND ----------
dlt.apply_changes(
target = "pimproduct_delta",
source = "pimproduct_to_process",
keys = ["id", "contextid"],
sequence_by = F.col("pim_timestamp").desc(),
apply_as_deletes = F.expr("action = 'D'"),
stored_as_scd_type = 1
)
pimproduct_to_process
schema is :
Name Type
id string
contextid string
name string
usertypeid string
parentid string
content string
action string
pim_timestamp string
to_process_ts int
When executing in delta live, i got the error :
org.apache.spark.sql.streaming.StreamingQueryException: [STREAM_FAILED] Query [id = e973ae65-631b-45d4-95b7-61ba510f1d0e, runId = 52a1f311-dcf0-49c0-ab63-91d3d79f09b9] terminated with exception: [INTERNAL_ERROR] Cannot generate code for expression: input[7, string, true] DESC NULLS LAST
The execution in notebook is ok. the workflow is generated but fails when writting primproduct_delta
. Any idea of what I should look for ?
From what I see in your schema pim_timestamp
is of type string. You should convert it to TIMESTAMP
and make sure that it is NOT NULL
. __START_AT
and __END_AT
columns in SCD will have same type. I think type of sequence can be either timestamp or numeric.
Also you should not use DESC
here.