I'm trying to implement a event-time temporal join but I don't see any data being emitted from the join. I don't see any runtime exceptions either.
Flink Version: 1.13
Kafka topics have only 1 partition for now
Here's how I set it up:
I have an "append-only" DataStream (left input/probe side) which looks like the following:
{
"eventType": String,
"eventTime": LocalDateTime,
"eventId": String
}
So, I convert this datastream to a table before joining them:
var eventTable = tableEnv.fromDataStream(eventStream, Schema.newBuilder()
.column("eventId", DataTypes.STRING())
.column("eventTime", DataTypes.TIMESTAMP(3))
.column("eventType", DataTypes.STRING())
.watermark("eventTime", $("eventTime"))
.build());
Then, I have the "versioned table" (right input/build side) backed by Kafka (Debezium CDC changelog) which looks like the following:
CREATE TABLE metadata (
id VARCHAR,
eventMetadata VARCHAR,
origin_ts TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
PRIMARY KEY (id) NOT ENFORCED,
WATERMARK FOR origin_ts AS origin_ts
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'SERVER_ADDR',
'properties.group.id' = 'SOME_GROUP',
'topic' = 'SOME_TOPIC',
'scan.startup.mode' = 'latest-offset',
'value.format' = 'debezium-json'
)
The join query looks like this:
SELECT e.eventId, e.eventTime, e.eventType, m.eventMetadata
FROM events_view AS e
JOIN metadata_view FOR SYSTEM_TIME AS OF e.eventTime AS m
ON e.eventId = m.id
Following some other post on here, I've set the source idle-timeout:
table.exec.source.idle-timeout -> 5
And, I've also tried setting IdlenessTime on the watermarks to make sure source doesn't back emitting the watermarks. At this point I can see watermarks being generated, but I still don't get any results. Everything just ends up sitting on the Temporal Join table.
So, the problem here was the syntax of the processing time temporal join. Here's how to fix this:
// register the metadata table as a temporal table func by specifying its watermark and primary-key attributes
var metadataHistory = tableEnv.from("metadata")
.createTemporalTableFunction($("proc_time"), $("id"));
tableEnv.createTemporarySystemFunction("metadata_view", metadataHistory);
// sql processing time temporal join
var temporalJoinResult = tableEnv.sqlQuery("SELECT" +
" e.eventId, e.eventType, e.eventTime, m.eventMetadata" +
" FROM events_view AS e," +
" LATERAL TABLE (metadata_view(t.procTime)) AS m" +
" WHERE e.eventId = m.id");
Here, proc_time on metadata needs to be declared within the table DDL like this,
CREATE TABLE metadata (
id VARCHAR,
eventMetadata VARCHAR,
proc_time as PROCTIME(),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'SERVER_ADDR',
'properties.group.id' = 'SOME_GROUP',
'topic' = 'SOME_TOPIC',
'scan.startup.mode' = 'latest-offset',
'value.format' = 'debezium-json'
)
and while converting the datastream to table, assign the procTime there for that table as well like this,
var eventTable = tableEnv.fromDataStream(eventStream, Schema.newBuilder()
.column("eventId", DataTypes.STRING())
.column("eventTime", DataTypes.TIMESTAMP(3))
.column("eventType", DataTypes.STRING())
.columnByExpression("procTime", "PROCTIME()")
.build());