I am looking a way to trigger my Databricks notebook once to process Kinesis Stream and using following pattern
import org.apache.spark.sql.streaming.Trigger
// Load your Streaming DataFrame
val sdf = spark.readStream.format("json").schema(my_schema).load("/in/path")
// Perform transformations and then write…
sdf.writeStream.trigger(Trigger.Once).format("delta").start("/out/path")
It looks like it's not possible with AWS Kinesis and that's what Databricks documentation suggest as well. My Question is what else can we do to Achieve that?
Since Databricks DBR 13.3, it's now possible to use Trigger.AvailableNow
(.trigger(availableNow=True)
in pyspark) to process data in batches from Kinesis.
Previously, we were running a cluster 24/7 with .trigger(processingTime=300)
in order to process a new batch every 5 minutes.
Now, we are processing one batch at a time and then letting the run complete. And we have configured the job with a continuous trigger, so that as soon as the job completes, another one is triggered (but it takes a while for the cluster to start, so it doesn't run right away).
This has allowed us to reduce the costs for that job by over 50%. 💸
Bonus: We can now do other things in between the runs/batches, such as vacuuming the table (using an inventory table, of course).