I have 3 Kafka topics: optionsTopic
, stocksTopic
, and referencesTopic
. All deployed locally in on single Kafka (v7.7.1
) instance in Kraft mode. To reduce complexity, I am setting a single partition for each of these topics. My goal is to merge the messages in these topics together, so I developed a Flink (v1.20
) application that performs a temporal join and put the result into another Kafka topic processedOptionsTopic
. Unfortunately, the application is yielding some inconsistent results in between local executions (which just replays the same dataset across Kafka). Sometimes, when I run the application everything goes well and all records are matched nicely, but other times the joined results from a previously existing symb
in the stockTrades
table come as nulls (same data, fresh env).
Here's how each source was defined:
// new options produced every 1s
tEnv.createTemporaryTable("optionTrades", TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.fromColumns(SchemaBuilders.forTrades())
.watermark("tTime", "tTime - INTERVAL '5' SECOND")
.build())
.option("topic", "optionsTopic")
.option("properties.bootstrap.servers", kafkaBServers)
.option("format", "json")
.option("scan.startup.mode", "earliest-offset")
.option("json.timestamp-format.standard", "ISO-8601")
.build());
// new stocks produced every 1s
tEnv.createTemporaryTable("stockTrades", TableDescriptor.forConnector("upsert-kafka")
.schema(Schema.newBuilder()
.fromColumns(SchemaBuilders.forTrades())
.watermark("tTime", "tTime - INTERVAL '5' SECOND")
.primaryKey("symb")
.build())
.option("topic", "stocksTopic")
.option("properties.bootstrap.servers", kafkaBServers)
.option("key.format", "raw")
.option("value.format", "json")
.option("value.json.timestamp-format.standard", "ISO-8601")
.build());
// new refs are produced every 30 min
tEnv.createTemporaryTable("tickersReference", TableDescriptor.forConnector("upsert-kafka")
.schema(Schema.newBuilder()
.fromColumns(SchemaBuilders.forRefs())
.watermark("rtTime", "rtTime - INTERVAL '3' SECOND")
.primaryKey("optionSymb")
.build())
.option("topic", "referencesTopic")
.option("properties.bootstrap.servers", kafkaBServers)
.option("key.format", "raw")
.option("value.format", "json")
.option("value.json.timestamp-format.standard", "ISO-8601")
.build());
Here's the sink definition:
tEnv.createTemporaryTable("processedOptions", TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.fromColumns(SchemaBuilders.forProcessedOptionTrades())
.build())
.option("topic", "processedOptionsTopic")
.option("properties.bootstrap.servers", kafkaBServers)
.option("sink.delivery-guarantee", "exactly-once")
.option("sink.transactional-id-prefix", "exactlyOncePrefix")
.option("format", "json")
.option("json.timestamp-format.standard", "ISO-8601")
.option("properties.transaction.timeout.ms", "900000")
.build());
The optionTrades
and stockTrades
tables have a symb
column that is mapped through the tickersReference
table's optionSymb
and stockSymb
columns respectively. This is how the temporal join is done:
Table mergedStreams = tEnv.sqlQuery(
"SELECT optionTrades.*, \n" +
// other fields were omitted for conciseness...
"tickersReference.rtTime AS rtTime \n" +
"FROM optionTrades \n" +
"LEFT JOIN tickersReference \n"+
"FOR SYSTEM_TIME AS OF optionTrades.tTime \n"+
"ON optionTrades.symb = tickersReference.optionSymb"
);
tEnv.createTemporaryView("optionTradeAndReference", mergedStreams);
tEnv.executeSql("INSERT INTO processedOptions \n" +
"SELECT optionTradeAndReference.*, \n" +
"stockTrades.tId AS stId, \n" +
// other fields were omitted for conciseness...
"stockTrades.tTime AS stTime \n" +
"FROM optionTradeAndReference \n" +
"LEFT JOIN stockTrades \n"+
"FOR SYSTEM_TIME AS OF optionTradeAndReference.tTime \n"+
"ON optionTradeAndReference.stockSymb = stockTrades.symb");
I set table.exec.source.idle-timeout = 500 ms
to avoid idlenesses of referencesTopic
and have also played around with the watermarks to fine tune it, but nothing really worked. Plus, I have parallelism.default
set to 1. I'd appreciate any pointers to fix it.
First guess: the problem is the idle-timeout of 500 msec. That could explain why you are getting inconsistent results. Try setting that higher (e.g., several seconds) and see what happens.