pythonparallel-processingazure-databricksdelta-live-tables

How can I apply an expectations for all the columns in table in DLT


I have seen tutorials, and articles everywhere it is given about using the DLT data quality expectations on the few columns from the table. Below is the code :

@dlt.table(
  comment="Wikipedia clickstream data cleaned and prepared for analysis."
)
@dlt.expect("valid_current_page_title", "current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_prepared():
  return (
    dlt.read("clickstream_raw")
      .withColumn("click_count", expr("CAST(n AS INT)"))
      .withColumnRenamed("curr_title", "current_page_title")
      .withColumnRenamed("prev_title", "previous_page_title")
      .select("current_page_title", "click_count", "previous_page_title")
  )

here, while specifying the expectations we have to manually mention the columns on which it will be implemented. But I want them to run for all the columns in the data frame.

To implement the expectations on all the columns in the data frame I used a loop and dynamically changed the function name (which acts as a table name). But it is highly inefficient.

for column in columns_list_order_table:     
    exec(f'''
@dlt.table(comment="null value validations for {column}")
@dlt.expect_or_drop("null values","is_null == false")
def null_validation_orders_for_column_{column}():
    df = dlt.read("bronze_orders")
    return df.withColumn("is_null", col("{column}").isNull())
''')

Solution

  • You can use @expect_all_or_fail(expectations)

    Declare one or more data quality constraints. expectations is a Python dictionary, where the key is the expectation description, and the value is the expectation constraint. If a row violates any of the expectations, immediately stop execution.

    Create exceptions for all of your columns as shown below.

    expectations = {}
    for i in columns_list:
        expectations["valid_" + i] = f"{i} IS NOT NULL"
    expectations
    

    Output:

    {'valid_state': 'state IS NOT NULL', 
    'valid_store_id': 'store_id IS NOT NULL',
    'valid_product_category': 'product_category IS NOT NULL', 
    'valid_SKU': 'SKU IS NOT NULL',
    'valid_price': 'price IS NOT NULL'}
    

    Then, add this to your code as shown below.

    columns_list = ['state', 'store_id', 'product_category', 'SKU', 'price']
    
    expectations = {}
    for i in columns_list:
        expectations["valid_" + i] = f"{i} IS NOT NULL"
    expectations
    
    @dlt.table()
    @dlt.expect_all_or_fail(expectations)
    def clickstream_raw():
      return (spark.read.schema(schema).option("header", "true").format("csv").load("/path/csv/"))
    

    If any one column contains null, it will raise an exception violation error. If you are using CSV data, set the header option to true because it will raise an error if you don't provide it, as column names will be included in the record.