apache-flinkflink-sqlflink-table-api

Flink: Temporal Join not emitting data


I'm trying to implement a event-time temporal join but I don't see any data being emitted from the join. I don't see any runtime exceptions either.

Flink Version: 1.13

Kafka topics have only 1 partition for now

Here's how I set it up:

I have an "append-only" DataStream (left input/probe side) which looks like the following:

{
"eventType": String,
"eventTime": LocalDateTime,
"eventId": String
}

So, I convert this datastream to a table before joining them:

var eventTable = tableEnv.fromDataStream(eventStream, Schema.newBuilder()
.column("eventId", DataTypes.STRING())
.column("eventTime", DataTypes.TIMESTAMP(3))
.column("eventType", DataTypes.STRING())
.watermark("eventTime", $("eventTime"))
.build());

Then, I have the "versioned table" (right input/build side) backed by Kafka (Debezium CDC changelog) which looks like the following:

CREATE TABLE metadata (
id                  VARCHAR,
eventMetadata       VARCHAR,
origin_ts           TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
PRIMARY KEY (id) NOT ENFORCED,
WATERMARK FOR origin_ts AS origin_ts
) WITH (
'connector'                           = 'kafka',
'properties.bootstrap.servers'        = 'SERVER_ADDR',
'properties.group.id'                 = 'SOME_GROUP',
'topic'                               = 'SOME_TOPIC',
'scan.startup.mode'                   = 'latest-offset',
'value.format'                        = 'debezium-json'
)

The join query looks like this:

SELECT e.eventId, e.eventTime, e.eventType, m.eventMetadata
FROM events_view AS e
JOIN metadata_view FOR SYSTEM_TIME AS OF e.eventTime AS m
ON e.eventId = m.id

Following some other post on here, I've set the source idle-timeout:

table.exec.source.idle-timeout -> 5

And, I've also tried setting IdlenessTime on the watermarks to make sure source doesn't back emitting the watermarks. At this point I can see watermarks being generated, but I still don't get any results. Everything just ends up sitting on the Temporal Join table.


Solution

  • So, the problem here was the syntax of the processing time temporal join. Here's how to fix this:

    // register the metadata table as a temporal table func by specifying its watermark and primary-key attributes
        var metadataHistory = tableEnv.from("metadata")
                .createTemporalTableFunction($("proc_time"), $("id"));
    
        tableEnv.createTemporarySystemFunction("metadata_view", metadataHistory);
    
        // sql processing time temporal join
        var temporalJoinResult = tableEnv.sqlQuery("SELECT" +
                " e.eventId, e.eventType, e.eventTime, m.eventMetadata" +
                " FROM events_view AS e," +
                " LATERAL TABLE (metadata_view(t.procTime)) AS m" +
                " WHERE e.eventId = m.id");
    

    Here, proc_time on metadata needs to be declared within the table DDL like this,

    CREATE TABLE metadata (
    id VARCHAR,
    eventMetadata VARCHAR,
    proc_time as PROCTIME(),
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector'                           = 'kafka',
    'properties.bootstrap.servers'        = 'SERVER_ADDR',
    'properties.group.id'                 = 'SOME_GROUP',
    'topic'                               = 'SOME_TOPIC',
    'scan.startup.mode'                   = 'latest-offset',
    'value.format'                        = 'debezium-json'
    )
    

    and while converting the datastream to table, assign the procTime there for that table as well like this,

    var eventTable = tableEnv.fromDataStream(eventStream, Schema.newBuilder()
    .column("eventId", DataTypes.STRING())
    .column("eventTime", DataTypes.TIMESTAMP(3))
    .column("eventType", DataTypes.STRING())
    .columnByExpression("procTime", "PROCTIME()")
    .build());