I am using pyspark in MS Fabric. I have a logging table, and I am trying to delete older entries from it.
That's easy enough, given:
a table whose name is stored in TABLE_LOG
a column in that table by the name LOG_TIME
with dtype TimestampType
The following example would keep only the last two days of logs (very roughly, because of the integer cast):
RUNTIME = current_timestamp()
PERSISTENCE = 2
SECONDS_IN_DAY=24*60*60
log = spark.read.format('delta').load(f'Tables/{TABLE_LOG}')
log = log.withColumn('age', (RUNTIME.cast(IntegerType()) - log[LOG_TIME].cast(IntegerType())) / SECONDS_IN_DAY)
log = log.where(log.age < PERSISTENCE)
log.write.mode('overwrite').format('delta').save(f'Tables/{TABLE_LOG}')
In SQL, you would partition and delete the data in-place, which should be neatly parallel, and thus more efficient.
But I want to keep using pyspark, because the whole environment (including the constants and variables) is in python.
Is it possible to do that using pyspark?
Okay, answering myself here. It works, at least for delta tables:
import delta
from pyspark.sql.functions import col, date_add
dt = delta.DeltaTable.forPath(spark, f'Tables/{TABLE_LOG}')
dt.delete(col(LOG_TIME) < date_add(RUNTIME, -PERSISTENCE))
...and that's it. Thank you for listening :-)