databricksdelta-live-tables

Fail delta live table process


I'm trying to load a delta live table. The input is some files in Avro (1 files every 15min).
The output is a SCD type 1 delta table in databricks :

here is my code in databricks :

# Databricks notebook source
import dlt
from pyspark.sql import functions as F, Window as W

# COMMAND ----------

schema = (
    spark.read.format("avro")
    .load(
        "/pim_raw_rec/Pimproduct_to_process/"
    )
    .schema
)

# COMMAND ----------

@dlt.table
def pimproduct_to_process():
    return (
        spark.readStream.format("avro")
        .schema(schema)
        .load(
            "/Pimproduct_to_process/"
        )
    )

# COMMAND ----------

dlt.create_streaming_table("pimproduct_delta")

# COMMAND ----------

dlt.apply_changes(
  target = "pimproduct_delta",
  source = "pimproduct_to_process",
  keys = ["id", "contextid"],
  sequence_by = F.col("pim_timestamp").desc(),
  apply_as_deletes = F.expr("action = 'D'"),
  stored_as_scd_type = 1
)

pimproduct_to_process schema is :

Name    Type
id  string
contextid   string
name    string
usertypeid  string
parentid    string
content string
action  string
pim_timestamp   string
to_process_ts   int

When executing in delta live, i got the error :

org.apache.spark.sql.streaming.StreamingQueryException: [STREAM_FAILED] Query [id = e973ae65-631b-45d4-95b7-61ba510f1d0e, runId = 52a1f311-dcf0-49c0-ab63-91d3d79f09b9] terminated with exception: [INTERNAL_ERROR] Cannot generate code for expression: input[7, string, true] DESC NULLS LAST

The execution in notebook is ok. the workflow is generated but fails when writting primproduct_delta. Any idea of what I should look for ?


Solution

  • From what I see in your schema pim_timestamp is of type string. You should convert it to TIMESTAMP and make sure that it is NOT NULL. __START_AT and __END_AT columns in SCD will have same type. I think type of sequence can be either timestamp or numeric.

    Also you should not use DESC here.