apache-kafkaapache-flinkflink-streamingflink-sql

Flink temporal join inconsistencies


I have 3 Kafka topics: optionsTopic, stocksTopic, and referencesTopic. All deployed locally in on single Kafka (v7.7.1) instance in Kraft mode. To reduce complexity, I am setting a single partition for each of these topics. My goal is to merge the messages in these topics together, so I developed a Flink (v1.20) application that performs a temporal join and put the result into another Kafka topic processedOptionsTopic. Unfortunately, the application is yielding some inconsistent results in between local executions (which just replays the same dataset across Kafka). Sometimes, when I run the application everything goes well and all records are matched nicely, but other times the joined results from a previously existing symb in the stockTrades table come as nulls (same data, fresh env).

Here's how each source was defined:

// new options produced every 1s 
tEnv.createTemporaryTable("optionTrades", TableDescriptor.forConnector("kafka")
    .schema(Schema.newBuilder()
        .fromColumns(SchemaBuilders.forTrades())
        .watermark("tTime", "tTime - INTERVAL '5' SECOND")
        .build())
    .option("topic", "optionsTopic")
    .option("properties.bootstrap.servers", kafkaBServers)
    .option("format", "json")
    .option("scan.startup.mode", "earliest-offset")
    .option("json.timestamp-format.standard", "ISO-8601")
    .build());
// new stocks produced every 1s 
tEnv.createTemporaryTable("stockTrades", TableDescriptor.forConnector("upsert-kafka")
    .schema(Schema.newBuilder()
        .fromColumns(SchemaBuilders.forTrades())
        .watermark("tTime", "tTime - INTERVAL '5' SECOND")
        .primaryKey("symb")
        .build())
    .option("topic", "stocksTopic")
    .option("properties.bootstrap.servers", kafkaBServers)
    .option("key.format", "raw")
    .option("value.format", "json")
    .option("value.json.timestamp-format.standard", "ISO-8601")
    .build());
// new refs are produced every 30 min
tEnv.createTemporaryTable("tickersReference", TableDescriptor.forConnector("upsert-kafka")
    .schema(Schema.newBuilder()
        .fromColumns(SchemaBuilders.forRefs())
        .watermark("rtTime", "rtTime - INTERVAL '3' SECOND")
        .primaryKey("optionSymb")
        .build())
    .option("topic", "referencesTopic")
    .option("properties.bootstrap.servers", kafkaBServers)
    .option("key.format", "raw")
    .option("value.format", "json")
    .option("value.json.timestamp-format.standard", "ISO-8601")
    .build());

Here's the sink definition:

tEnv.createTemporaryTable("processedOptions", TableDescriptor.forConnector("kafka")
    .schema(Schema.newBuilder()
        .fromColumns(SchemaBuilders.forProcessedOptionTrades())
        .build())
    .option("topic", "processedOptionsTopic")
    .option("properties.bootstrap.servers", kafkaBServers)
    .option("sink.delivery-guarantee", "exactly-once")
    .option("sink.transactional-id-prefix", "exactlyOncePrefix")
    .option("format", "json")
    .option("json.timestamp-format.standard", "ISO-8601")
    .option("properties.transaction.timeout.ms", "900000")
    .build());

The optionTrades and stockTrades tables have a symb column that is mapped through the tickersReference table's optionSymb and stockSymb columns respectively. This is how the temporal join is done:

Table mergedStreams = tEnv.sqlQuery(
    "SELECT optionTrades.*, \n" +
        // other fields were omitted for conciseness... 
        "tickersReference.rtTime AS rtTime \n" +
    "FROM optionTrades \n" +
    "LEFT JOIN tickersReference \n"+
        "FOR SYSTEM_TIME AS OF optionTrades.tTime \n"+
        "ON optionTrades.symb = tickersReference.optionSymb"
);

tEnv.createTemporaryView("optionTradeAndReference", mergedStreams);
tEnv.executeSql("INSERT INTO processedOptions \n" +
    "SELECT optionTradeAndReference.*, \n" +
        "stockTrades.tId AS stId, \n" +
        // other fields were omitted for conciseness... 
        "stockTrades.tTime AS stTime \n" +
    "FROM optionTradeAndReference \n" +
    "LEFT JOIN stockTrades \n"+
        "FOR SYSTEM_TIME AS OF optionTradeAndReference.tTime \n"+
        "ON optionTradeAndReference.stockSymb = stockTrades.symb");

I set table.exec.source.idle-timeout = 500 ms to avoid idlenesses of referencesTopic and have also played around with the watermarks to fine tune it, but nothing really worked. Plus, I have parallelism.default set to 1. I'd appreciate any pointers to fix it.


Solution

  • First guess: the problem is the idle-timeout of 500 msec. That could explain why you are getting inconsistent results. Try setting that higher (e.g., several seconds) and see what happens.