apache-flinkflink-streamingflink-table-api

No results in kafka topic sink when applying tumble window aggregation in Flink Table API


I am using Flink 1.14 deployed by lyft flink operator

I am trying to make tumble window aggregate with the Table API, read from the transactions table source, and put the aggregate result by window into a new kafka topic

My source is a kafka topic from debezium

EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);

    //this is the source
    tEnv.executeSql("CREATE TABLE transactions (\n" +
            " event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,\n"+
            "  transaction_time AS TO_TIMESTAMP_LTZ(4001, 3),\n"+
            "  id INT PRIMARY KEY,\n" +
            "  transaction_status STRING,\n" +
            "  transaction_type STRING,\n" +
            "  merchant_id INT,\n" +
            "  WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
            ") WITH (\n" +
            " 'debezium-json.schema-include' = 'true' ,\n" +
            " 'connector' = 'kafka',\n" +
            " 'topic' = 'dbserver1.inventory.transactions',\n" +
            " 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.kafka.svc:9092',\n" +
            " 'properties.group.id' = 'testGroup',\n" +
            " 'scan.startup.mode' = 'earliest-offset',\n"+
            " 'format' = 'debezium-json'\n" +
            ")");

I do the tumble window and count the ids in the same window by:

public static Table report(Table transactions) {
    return transactions
            .window(Tumble.over(lit(2).minutes()).on($("transaction_time")).as("w"))
            .groupBy($("w"), $("transaction_status"))
            .select(
                    $("w").start().as("window_start"),
                    $("w").end().as("window_end"),
                    $("transaction_status"),
                    $("id").count().as("id_count"));
}

The sink is:

tEnv.executeSql("CREATE TABLE my_report (\n" +
            "window_start TIMESTAMP(3),\n"+
            "window_end TIMESTAMP(3)\n,"+
            "transaction_status STRING,\n" +
            " id_count BIGINT,\n" +
            " PRIMARY KEY (window_start) NOT ENFORCED\n"+
            ") WITH (\n" +
            " 'connector' = 'upsert-kafka',\n" +
            " 'topic' = 'dbserver1.inventory.my-window-sink',\n" +
            " 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.kafka.svc:9092',\n" +
            " 'properties.group.id' = 'testGroup',\n" +
            " 'key.format' = 'json',\n"+
            " 'value.format' = 'json'\n"+
            ")");
    Table transactions = tEnv.from("transactions");
    Table merchants = tEnv.from("merchants");
    report(transactions).executeInsert("my_report");

The problem is when I consume dbserver1.inventory.my-window-sink kubectl -n kafka exec my-cluster-kafka-0 -c kafka -i -t -- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbserver1.inventory.my-window-sink --from-beginning I don't get any results, I wait 2 minutes (the window size), insert into the transactions table and then wait again for 2 min and insert again also no results. I don't know if I have a problem with my watermark

I am working with parallelism: 2

On the flink dashboard UI I can see that in the Details of GroupWindowAggregate task the Records Received is increased when I insert into the table but still, I can't see the results when I consume the topic!


Solution

  • In addition to what David thankfully answered, I was missing table.exec.source.idle-timeout as a configuration of the streaming environment, a variable that checks if the source becomes idle. The default value of the variable is 0 which means that it doesn't check if the source becomes idle. I made it 1000ms and that fixed it as it checks for that idle source condition and the watermarks are generated properly that way. this won't probably affect regular streams that have consistent message ingestion into them but was the case for me as I was inserting records manually and hence the stream was idle at a lot of times