I want the output of Spark Stream to be sent to the Sink at the end of the Tumbling Window and not at the batch interval.
I am reading from a Kafka stream and outputting to another Kafka stream.
Code to query and write output is like this:
Dataset<Row> sqlResult = session.sql("select window, user, sum(amount) as amount from users where type = 'A' group by window(timestamp, '1 minute', '1 minute'), user");
sqlResult = sqlResult.select(to_json(struct("window", "user", "amount")).as("value"));
StreamingQuery query = sqlResult.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "aggregated-topic")
.option("checkpointLocation", "c:/tmp")
.outputMode(OutputMode.Update())
.start();
When I send multiple records for a particular user within a window of 1 minute, I want sum of amount of these events at the end of the 1 minute.
But I get multiple outputs on output Kafka stream, with intermittent aggregations written to it.
Eg.
I am sending the following 7 records within a minute window but at some intervals.
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
Output I am getting is this :
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":10.0}
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":20.0}
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":40.0}
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":60.0}
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":70.0}
You can see, the output is within the same window, but there are multiple outputs.
What I want is single output at the end of the minute as
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":70.0}
How can I achieve it?
You need to set the processing trigger while writing the stream to the sink.
You use .trigger(Trigger.ProcessingTime) of DataStreamWriter with appropriate trigger value.
StreamingQuery query = sqlResult.writeStream()
.trigger(Trigger.ProcessingTime("1 minute")) //this