I'm having a problem understanding how to preserve the order of events when consuming records from a Kinesis stream with Flink. Our setup looks like this:
In Flink, we use the Table API to consume the Kinesis stream, do some processing and write the events to a (custom) synchronous HTTP sink. The desired outcome would be that each shards processing subtask writes the events to the sink one after the other, waiting for the sink to return before writing the next event. In order to test that, we made sink functions randomly do a Thread.sleep()
for a few seconds before returning. Looking at the log output, we can now see this:
13:00:06.120 c.s.d.a.p.p.f.sinks.HttpSinkFunction - BLOCKING 802719369 {"userId":"6383449","eventTime":"2022-02-15T11:59:37.792Z","shardId":"shardId-000000000005"}
13:00:06.476 c.s.d.a.p.p.f.sinks.HttpSinkFunction - 1973378384 {"userId":"6383449","eventTime":"2022-02-15T11:59:37.792Z","shardId":"shardId-000000000005"}
The first line is from one of the blocking sinks, the second line is from a non-blocking sink. Both events are from the same user (= the same shard, see shardId in the JSON object) and have been processed a few milliseconds from each other, even though the first sink will sleep for 10 seconds after writing the log line. That also means that the results will arrive at the HTTP endpoint out of order.
I've studied the Flink documentation on parallelism and backpressure, but I'm still not sure how to achieve the desired behaviour. Is it possible to write output to one sink function per shard at a time, so that a shard's complete processing is delayed if a sink is responding slowly?
Update: More information on the setup
First of all, we define an input table (with the Kinesis connector) and an output table (with our custom http connector). Then we create a statement set, add a couple of insert SQLs to it and execute this set. The code looks pretty much like this (with extractionSql
being a list of query strings, see below):
StatementSet statementSet = tableEnv.createStatementSet();
for (String extractionSql : extractionSqls) {
statementSet.addInsertSql(extractionSql);
}
statementSet.execute();
The insert SQLs all look pretty similar and are basically just extracting properties from the input events, there's also one window function involved (tumbling window). An example SQL looks like this:
INSERT INTO output_table
SELECT userId, 'replace', eventTime, MAP['heroLevel',payload['newLevel']], shardId
FROM input_table
WHERE `eventType` = 'LEVELUP'
The idea is, whenever an event with type 'LEVELUP' arrives, we want to send a http request to our API. Due to how the processing works later, we need to make sure that the events for a single user are being sent in order and synchronously.
In the Flink dashboard, the resulting graph looks like this:
Given your requirements, the only way I can see to do this would be to bring all of the results for each user together so that they are written by the same instance of the sink.
Perhaps it would work to rewrite this as one large join (or union) on the user-id that you sort by timestamp. Or you might convert the results of the SQL queries into datastreams that you key by the user-id, and then implement some buffering and sorting in your custom sink.