There are 3 types of quality checks in Delta live tables:
expect
(retain invalid records)expect_or_drop
(drop invalid records)expect_or_fail
(fail on invalid records)I want to retain invalid records, but I also want to keep track of them. So, by using expect
, can I query the invalid records, or is it just for keeping stats like "n records were invalid"?
expect
just record that you had some problems so you have some statistics about you data quality in the pipeline. But it's not very useful in practice.
Native quarantine functionality is still not available, that's why there is the recipe in the cookbook. Although it's not exactly what you need, you can still build on top of it, especially if you take into account the second part of the recipe that explicitly adds a Quarantine
column - we can combine it with expect
to get statistics into UI:
import dlt
from pyspark.sql.functions import expr
rules = {}
quarantine_rules = {}
...
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))
@dlt.table(
name="partitioned_farmers_market",
partition_cols = [ 'Quarantine' ]
)
@dlt.expect_all(rules)
def get_partitioned_farmers_market():
return (
dlt.read("raw_farmers_market")
.withColumn("Quarantine", expr(quarantine_rules))
.select("MarketName", "Website", "Location", "State",
"Facebook", "Twitter", "Youtube", "Organic", "updateTime",
"Quarantine")
)
Another approach would be to use first part of the recipe (that uses expect_all_or_drop
), and just union both tables (it's better to mark the valid/invalid tables with temporary = True
marker)