Platform: Azure Databricks Compute: Serverless
I create a Spark data frame by selecting a subset of records from a delta table. I perform several transformations on the data set. Each transformation results in a new data frame. Before the transformation, I had one record for each id, and after the transformation I have multiple records for each id. That is intentional. Once done, I delete this subset of records from the (source) delta table, and I insert the transformed records.
My problem is after deleting the initial subset of records from the (source) delta table, the final data with my transformations is empty, and I want to understand why. I confirmed that the final data frame has values prior to the delete. Once the delete is done, the data frame has no rows.
The following is an example to clarify what I'm doing. I have a delta table: foo with the following schema:
CREATE TABLE foo (id int, bar string, baz string, );
bar is a field that stores multiple values delimited by pipe ("|"). For example, the following query returns a result this|that|those:
SELECT bar FROM foo LIMIT 1;
I want to denormalize the contents of bar from the first table to the second:
+----+-------------+-----+--------+
| id | discount | baz | price |
+----+-------------+-----+--------+
| 1 | 10%|20%|30% | A | 100.00 |
+----+-------------+-----+--------+
| 2 | 30% | A | 90.00 |
+----+-------------+-----+--------+
+----+----------+-----+--------+
| id | discount | baz | price |
+----+----------+-----+--------+
| 1 | 10% | A | 90.00 |
+----+----------+-----+--------+
| 1 | 20% | A | 80.00 |
+----+----------+-----+--------+
| 1 | 30% | A | 70.00 |
+----+----------+-----+--------+
| 2 | 30% | A | 90.00 |
+----+----------+-----+--------+
To do this, I've got logic as follows:
from pyspark.sql import functions as F
df1 = spark.table("foo") \
.filter(F.col("baz") == "A")
df2 = df1.filter(F.expr("position('|', discount)") > 1
df3 = df2 \
.select(F.explode(F.split("discount", "\\|")).alias(discount_denorm), "*") \
.withColumn("new_discount", 1 - F.get(F.regexp_extract_all(F.col("discount_denorm"), F.lit(r'(\d+)%'), 1), 0) / 100) \
.select(
"id",
F.col("new_discount").alias("discount"),
"baz",
F.col("price") * F.col("new_discount").cast("decimal(10,2)").alias("price")
)
df3.count()
df2.select("id").createOrReplaceTempView("ids_to_delete")
spark.sql("""
DELETE FROM foo WHERE EXISTS (SELECT 1 FROM ids_to_delete WHERE foo.id = id)
""")
df3.write.format("delta").mode("append").saveAsTable("foo")
I ended making this change with several CTEs and a merge. This approach avoided the need for a staging table, and it let me implement the change without DELETE and INSERT logic.
CTE #1: Get the working set of data from the source table.
CTE #2: Identify the records I want to change. In this case, its those with pipes in a specific field.
CTE #3: Make the desired transformation to the subject records. Admission: I could've kept things more concise by merging this CTE with CTE #2.
CTE #4: I add a column denoting the rank (ordinal position) of the row with-in a sub-set with-in my data set. Since I'm adding several rows for a single source record, I order the rows for each source row.
CTE #5: I perform one last transformation on the data, calculating a discount percent. More importantly, I remove the primary key for the second record of any new records.
MERGE: I merge my changes into the source table.
In some cases, I am denormalizing the source data set, creating several new records for a singe record. CTE #5 removes the primary key for new records except the first one. This results in the MERGE updating the existing record then adding the new records to the data set. The new records get a new primary key, which is exactly what I want.
The following code demonstrates the logic:
with pricing AS (
SELECT * FROM users.adam.stuff WHERE as_of_date = '2025-09-15'
)
, pricing_promo_tiered AS (
SELECT * FROM pricing WHERE position('|', promos) > 1
)
, pricing_promo_tiered_denorm AS (
SELECT explode(split(promos, '\\|+')) as promo_parsed
, *
FROM pricing_promo_tiered
)
, pricing_promo_tiered_denorm_w_discount AS (
SELECT 1 - GET(REGEXP_EXTRACT_ALL(promo_parsed, '(\\d+)%', 1), 0) / 100 AS discount_percentage
, *
FROM pricing_promo_tiered_denorm
)
, pricing_promo_tiered_denorm_w_discount_and_pk AS (
SELECT ROW_NUMBER() OVER (PARTITION BY PKID ORDER BY as_of_date) AS `promo_occurrence`
, *
FROM pricing_promo_tiered_denorm_w_discount
)
, pricing_current_day_promo_tiered_denorm_w_discount_price AS (
SELECT CASE
WHEN promo_occurrence == 1 THEN PKID ELSE NULL
END AS PKID
, promo_parsed AS `promos`
, as_of_date
, regular_price
, CAST(CEIL(regular_price * discount_percentage) AS DECIMAL(10,2)) AS `discount_price`
, NULL AS `SNM`
FROM pricing_promo_tiered_denorm_w_discount_and_pk
)
MERGE INTO users.adam.stuff AS trg
USING pricing_current_day_promo_tiered_denorm_w_discount_price AS src
ON trg.PKID = src.PKID
WHEN MATCHED THEN UPDATE SET
trg.promos = src.promos
, trg.as_of_date = src.as_of_date
, trg.regular_price = src.regular_price
, trg.discount_price = src.discount_price
, trg.SNM = src.SNM
WHEN NOT MATCHED THEN
INSERT (promos, as_of_date, regular_price, discount_price, SNM)
VALUES (promos, as_of_date, regular_price, discount_price, SNM);
The following is a table schema if you want to try this out:
CREATE OR REPLACE TABLE users.adam.stuff (
PKID BIGINT GENERATED BY DEFAULT AS IDENTITY (START WITH 1 INCREMENT BY 1)
, promos string
, as_of_date date
, regular_price double
, discount_price double
, SNM double
);
The following is sample data for the above table:
USE CATALOG users;
USE SCHEMA adam;
INSERT INTO stuff (promos, as_of_date, regular_price, discount_price) VALUES (NULL, '2025-09-13', 92.50, NULL);
INSERT INTO stuff (promos, as_of_date, regular_price, discount_price) VALUES (NULL, '2025-09-13', 98.75, NULL);
INSERT INTO stuff (promos, as_of_date, regular_price, discount_price) VALUES ('25% off two month reservation', '2025-09-13', 105.00, NULL);
INSERT INTO stuff (promos, as_of_date, regular_price, discount_price) VALUES (NULL, '2025-09-13', 115.25, NULL);
INSERT INTO stuff (promos, as_of_date, regular_price, discount_price) VALUES (NULL, '2025-09-13', 101.80, NULL);
INSERT INTO stuff (promos, as_of_date, regular_price, discount_price) VALUES ('30% off three month reservation', '2025-09-13', 109.90, NULL);
You'll want more sample data than what's above. You can use this as the basis for creating additional sample data.