I have a KSQL Table with multiple primary keys. I need to create a new stream and perform a LEFT JOIN with the initial Table. However, it appears KSQL does not handle multiple condition LEFT JOINs.
KSQLDB version: 0.29.0
My code is as follows:
SET 'auto.offset.reset' = 'earliest'
CREATE STREAM TEST_INITIAL_STREAM (
`VALUE_AS_PRIMARY_KEY1` STRING,
`VALUE_AS_PRIMARY_KEY2` STRING,
`VALUE_AS_PRIMARY_KEY3` STRING,
`VALUE_AS_PRIMARY_KEY4` STRING,
`ORDINARY_VALUE1` STRING,
`ORDINARY_VALUE2` STRING,
`MY_TIMESTAMP` STRING,
) WITH (
KAFKA_TOPIC = 'example-topic1',
VALUE_FORMAT = 'JSON',
PARTITIONS = 3
);
CREATE STREAM TEST_PARSED_TIMESTAMP_STREAM WITH (
KAFKA_TOPIC = 'example-topic2'
) AS SELECT
VALUE_AS_PRIMARY_KEY1,
VALUE_AS_PRIMARY_KEY2,
VALUE_AS_PRIMARY_KEY3,
VALUE_AS_PRIMARY_KEY4,
ORDINARY_VALUE1,
ORDINARY_VALUE2,
PARSE_TIMESTAMP(MY_TIMESTAMP, 'yyyy-MM-dd') AS MY_TIMESTAMP_PARSED
FROM TEST_INITIAL_STREAM EMIT CHANGES;
CREATE TABLE TEST_MAX_TIME WITH (
KAFKA_TOPIC = 'example-topic3', FORMAT = 'JSON'
) AS SELECT
VALUE_AS_PRIMARY_KEY1,
VALUE_AS_PRIMARY_KEY2,
VALUE_AS_PRIMARY_KEY3,
VALUE_AS_PRIMARY_KEY4,
MAX(MY_TIMESTAMP_PARSED) AS MAX_TIMESTAMP
FROM TEST_PARSED_TIMESTAMP_STREAM
GROUP BY VALUE_AS_PRIMARY_KEY1, VALUE_AS_PRIMARY_KEY2, VALUE_AS_PRIMARY_KEY3, VALUE_AS_PRIMARY_KEY4
EMIT CHANGES;
CREATE STREAM TEST_FILTERED_STREAM WITH (
KAFKA_TOPIC = 'example-topic4'
) AS SELECT
T1.VALUE_AS_PRIMARY_KEY1,
T1.VALUE_AS_PRIMARY_KEY2,
T1.VALUE_AS_PRIMARY_KEY3,
T1.VALUE_AS_PRIMARY_KEY4,
T1.ORDINARY_VALUE1,
T1.ORDINARY_VALUE2,
T1.MY_TIMESTAMP_PARSED
FROM TEST_PARSED_TIMESTAMP_STREAM T1
LEFT JOIN TEST_MAX_TIME T2 ON (
T1.VALUE_AS_PRIMARY_KEY1 = T2.VALUE_AS_PRIMARY_KEY1 AND
T1.VALUE_AS_PRIMARY_KEY2 = T2.VALUE_AS_PRIMARY_KEY2 AND
T1.VALUE_AS_PRIMARY_KEY3 = T2.VALUE_AS_PRIMARY_KEY3 AND
T1.VALUE_AS_PRIMARY_KEY4 = T2.VALUE_AS_PRIMARY_KEY4
)
WHERE T1.MY_TIMESTAMP_PARSED >= T2.MAX_TIMESTAMP
EMIT CHANGES;
However, it appears I cannot do that, with the Unsupported join expression
error. What is the appropriate way to perform a left join while having a table with multiple primary keys?
In addition, I tried using one ON
conditional and moving the other 3 conditionals to the WHERE
clause like so:
CREATE STREAM TEST_FILTERED_STREAM WITH (
KAFKA_TOPIC = 'example-topic4'
) AS SELECT
T1.VALUE_AS_PRIMARY_KEY1,
T1.VALUE_AS_PRIMARY_KEY2,
T1.VALUE_AS_PRIMARY_KEY3,
T1.VALUE_AS_PRIMARY_KEY4,
T1.ORDINARY_VALUE1,
T1.ORDINARY_VALUE2,
T1.MY_TIMESTAMP_PARSED
FROM TEST_PARSED_TIMESTAMP_STREAM T1
LEFT JOIN TEST_MAX_TIME T2 ON (
T1.VALUE_AS_PRIMARY_KEY1 = T2.VALUE_AS_PRIMARY_KEY1
)
WHERE T1.MY_TIMESTAMP_PARSED >= T2.MAX_TIMESTAMPAND AND
T1.VALUE_AS_PRIMARY_KEY2 = T2.VALUE_AS_PRIMARY_KEY2 AND
T1.VALUE_AS_PRIMARY_KEY3 = T2.VALUE_AS_PRIMARY_KEY3 AND
T1.VALUE_AS_PRIMARY_KEY4 = T2.VALUE_AS_PRIMARY_KEY4
EMIT CHANGES;
But this also resulted in an error: Invalid join condition: stream-table joins require to join on the table's primary key
.
This answer is based on the comment made by vkm80 on reddit.
A proposed workaround is to derive a new column by concatenating primary key column values into a single column. This is how working code would look like:
SET 'auto.offset.reset' = 'earliest'
CREATE STREAM TEST_INITIAL_STREAM (
`VALUE_AS_PRIMARY_KEY1` STRING,
`VALUE_AS_PRIMARY_KEY2` STRING,
`VALUE_AS_PRIMARY_KEY3` STRING,
`VALUE_AS_PRIMARY_KEY4` STRING,
`ORDINARY_VALUE1` STRING,
`ORDINARY_VALUE2` STRING,
`MY_TIMESTAMP` STRING,
) WITH (
KAFKA_TOPIC = 'example-topic1',
VALUE_FORMAT = 'JSON',
PARTITIONS = 3
);
CREATE STREAM TEST_PARSED_TIMESTAMP_STREAM WITH (
KAFKA_TOPIC = 'example-topic2'
) AS SELECT
VALUE_AS_PRIMARY_KEY1,
VALUE_AS_PRIMARY_KEY2,
VALUE_AS_PRIMARY_KEY3,
VALUE_AS_PRIMARY_KEY4,
ORDINARY_VALUE1,
ORDINARY_VALUE2,
PARSE_TIMESTAMP(MY_TIMESTAMP, 'yyyy-MM-dd') AS MY_TIMESTAMP_PARSED
CONCAT(VALUE_AS_PRIMARY_KEY1, '-', VALUE_AS_PRIMARY_KEY2, '-', VALUE_AS_PRIMARY_KEY3, '-', VALUE_AS_PRIMARY_KEY4) AS COMBINED_KEYS
FROM TEST_INITIAL_STREAM EMIT CHANGES;
CREATE TABLE TEST_MAX_TIME WITH (
KAFKA_TOPIC = 'example-topic3', FORMAT = 'JSON'
) AS SELECT
COMBINED_KEYS,
MAX(MY_TIMESTAMP_PARSED) AS MAX_TIMESTAMP
FROM TEST_PARSED_TIMESTAMP_STREAM
GROUP BY COMBINED_KEYS
EMIT CHANGES;
CREATE STREAM TEST_FILTERED_STREAM WITH (
KAFKA_TOPIC = 'example-topic4'
) AS SELECT
T1.VALUE_AS_PRIMARY_KEY1,
T1.VALUE_AS_PRIMARY_KEY2,
T1.VALUE_AS_PRIMARY_KEY3,
T1.VALUE_AS_PRIMARY_KEY4,
T1.ORDINARY_VALUE1,
T1.ORDINARY_VALUE2,
T1.MY_TIMESTAMP_PARSED,
T1.COMBINED_KEYS
FROM TEST_PARSED_TIMESTAMP_STREAM T1
LEFT JOIN TEST_MAX_TIME T2 ON T1.COMBINED_KEYS = T2.COMBINED_KEYS
WHERE T1.MY_TIMESTAMP_PARSED >= T2.MAX_TIMESTAMP
EMIT CHANGES;
That's it!