My ultimate goal is to have a copy (in delta format) of an append-only incremental table that is in JDBC (SQL).
I have a batch process reading from the incremental append-only JDBC (SQL) table, with spark.read
(since .readStream
is not supported for JDBC sources). Every day, the most recent day of data is saved as a delta table. Note, this is not an append-only delta table - rather it is overwritten every day with the most recent day of data.
What I think I need next is spark.readStream
out of the delta table, with the .option("skipChangeCommits", "true")
. The Databricks Documentation here outlines exactly this.
I have selected Preview channel in pipeline settings.
The code for the final streaming table in my DLT pipeline is:
@table(
name='streaming table',
)
def create_df():
return spark.readStream.option("skipChangeCommits", "true").table("daily_batch")
However the error here is NameError: name 'table' is not defined,None,Map(),Map(),List(),List(),Map())
In case it is a typo in the documentation I have also tried with dlt.table
and the error is:
pyspark.errors.exceptions.AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view "daily_batch" cannot be found. Verify the spelling and correctness of the schema and catalog.
My solution to this was to simply use a regular job, not DLT. This simplifies a lot, as there is no need for an intermediate table. It was just:
[JDBC incremental SQL] --Databricks-job--> [delta table write with append only mode].
I saved the watermark as metadata in the delta table with
.option("userMetadata", watermark.strftime("%Y-%m-%dT%H:%M:%S"))`
And read it with each subsequent batch run:
dt = delta.tables.DeltaTable.forPath(spark, path)
watermark = dt.history().select(F.col("userMetadata")).first()[0]
I used spark.read.format("sqlserver")
to query from the JDBC server.