I have a Kafka topic called A.
format of data in topic A is :
{ id : 1, name:stackoverflow, created_at:2017-09-28 22:30:00.000}
{ id : 2, name:confluent, created_at:2017-09-28 22:00:00.000}
{ id : 3, name:kafka, created_at:2017-09-28 24:42:00.000}
{ id : 4, name:apache, created_at:2017-09-28 24:41:00.000}
Now in consumer side i want to get only latest data of one hour window means every one hour i need to get latest value from topic based on created_at
My expected output is :
{ id : 1, name:stackoverflow, created_at:2017-09-28 22:30:00.000}
{ id : 3, name:kafka, created_at:2017-09-28 24:42:00.000}
I think this can be solve by ksql but i m not sure. Please help me.
Thank in advance.
Yes, you can use KSQL for this. Try the following:
CREATE STREAM S1 (id BIGINT, name VARCHAR, created_at VARCHAR)
WITH (kafka_topic = 'topic_name', value_format = 'JSON');
CREATE TABLE maxRow AS
SELECT
id,
name,
max(STRINGTOTIMESTAMP(created_at, 'yyyy-mm-dd hh:mm:ss.SSS'))
AS created_at
FROM s1
WINDOW TUMBLING (size 1 hour)
GROUP BY id, name;
The result will have the created_at
time in Linux timestamp format. You can change it into your desired format using TIMESTAMPTOSTRING
UDF in a new query.
Please let me know if you find any issues.