databricksspark-structured-streamingreal-time-data

How can I test end to end latency in Databricks Structured Streaming?


I'm new to Structured Streaming and trying to do performance testing reading data in Structured Streaming I'd like to test different scenarios for example, different cluster sizes, different number of messages per sec, processing data between delta table and writing back to different targets.

My problem is that I couldn't find how to compare end to end latency or time it took. Unlike batch processing, I wasn't able to refer to the time command took in notebook command since it is constantly running. Anyone knows where to start? Is there any function that I can find this information in Databricks UI? Googling didn't really help me.

I tried to look in notebook, jobs and it was hard to understand exactly about the latency between processes.


Solution

  • There's no built-in way in Databricks to get e2e latency of a Structured Streaming query. But I'll still discuss a way that you can do for Structured Streaming in general.

    First, you can use the rate-source to generate a fixed workload for your records. Each rate-source record has a timestamp, which you can use as the source timestamp. The schema from the rate-source has a timestamp field, which you can rename to source-timestamp (or something like that) with a withColumn call.

    Next, if you're writing to a sink like Kafka, you should be able to instruct your broker to include a timestamp column as the Log-Append Time of incoming records, as specified here. Most other message queues should have this functionality, but you'll have to check those docs. Such functionality should ensure that your sink table has both a source and sink timestamp.

    From there, you can write a batch query to calculate another table with the difference between the sink timestamp and the source timestamp. You can then use a percentile function to calculate the latency percentiles of this column.

    If you're using something like a file sink, you might have to do some extra work to get the sink timestamp into the records. You might need to write some custom Spark code to go through each created sink file, extract its timestamp, and correlate that with the records in the associated file.

    Finally, you should note that calculating e2e latency depends a bit upon the operators you're using:

    (Disclaimer: I'm a Databricks employee who works on Structured Streaming.)