apache-kafkaconfluent-platformksqldb

How to use multiple condition left joins in Confluent KSQL query?


I have a KSQL Table with multiple primary keys. I need to create a new stream and perform a LEFT JOIN with the initial Table. However, it appears KSQL does not handle multiple condition LEFT JOINs.

KSQLDB version: 0.29.0

My code is as follows:

  1. Setting offset to earliest
SET 'auto.offset.reset' = 'earliest'
  1. Creating the initial stream
CREATE STREAM TEST_INITIAL_STREAM (
    `VALUE_AS_PRIMARY_KEY1` STRING,
    `VALUE_AS_PRIMARY_KEY2` STRING,
    `VALUE_AS_PRIMARY_KEY3` STRING,
    `VALUE_AS_PRIMARY_KEY4` STRING,
    `ORDINARY_VALUE1` STRING,
    `ORDINARY_VALUE2` STRING,
    `MY_TIMESTAMP` STRING,
) WITH (
    KAFKA_TOPIC = 'example-topic1',
    VALUE_FORMAT = 'JSON',
    PARTITIONS = 3
);
  1. Parsing the timestamp
CREATE STREAM TEST_PARSED_TIMESTAMP_STREAM WITH (
    KAFKA_TOPIC = 'example-topic2'
) AS SELECT
    VALUE_AS_PRIMARY_KEY1,
    VALUE_AS_PRIMARY_KEY2,
    VALUE_AS_PRIMARY_KEY3,
    VALUE_AS_PRIMARY_KEY4,
    ORDINARY_VALUE1,
    ORDINARY_VALUE2,
    PARSE_TIMESTAMP(MY_TIMESTAMP, 'yyyy-MM-dd') AS MY_TIMESTAMP_PARSED
FROM TEST_INITIAL_STREAM EMIT CHANGES;
  1. Creating the table to hold the maximum timestamp of a unique combination of primary keys
CREATE TABLE TEST_MAX_TIME WITH (
    KAFKA_TOPIC = 'example-topic3', FORMAT = 'JSON'
) AS SELECT
    VALUE_AS_PRIMARY_KEY1,
    VALUE_AS_PRIMARY_KEY2,
    VALUE_AS_PRIMARY_KEY3,
    VALUE_AS_PRIMARY_KEY4,
    MAX(MY_TIMESTAMP_PARSED) AS MAX_TIMESTAMP
FROM TEST_PARSED_TIMESTAMP_STREAM
GROUP BY VALUE_AS_PRIMARY_KEY1, VALUE_AS_PRIMARY_KEY2, VALUE_AS_PRIMARY_KEY3, VALUE_AS_PRIMARY_KEY4
EMIT CHANGES;
  1. And finally, I tried to create a filtered stream
CREATE STREAM TEST_FILTERED_STREAM WITH (
    KAFKA_TOPIC = 'example-topic4'
) AS SELECT
    T1.VALUE_AS_PRIMARY_KEY1,
    T1.VALUE_AS_PRIMARY_KEY2,
    T1.VALUE_AS_PRIMARY_KEY3,
    T1.VALUE_AS_PRIMARY_KEY4,
    T1.ORDINARY_VALUE1,
    T1.ORDINARY_VALUE2,
    T1.MY_TIMESTAMP_PARSED
FROM TEST_PARSED_TIMESTAMP_STREAM T1
LEFT JOIN TEST_MAX_TIME T2 ON (
    T1.VALUE_AS_PRIMARY_KEY1 = T2.VALUE_AS_PRIMARY_KEY1 AND
    T1.VALUE_AS_PRIMARY_KEY2 = T2.VALUE_AS_PRIMARY_KEY2 AND
    T1.VALUE_AS_PRIMARY_KEY3 = T2.VALUE_AS_PRIMARY_KEY3 AND
    T1.VALUE_AS_PRIMARY_KEY4 = T2.VALUE_AS_PRIMARY_KEY4
)
WHERE T1.MY_TIMESTAMP_PARSED >= T2.MAX_TIMESTAMP
EMIT CHANGES;

However, it appears I cannot do that, with the Unsupported join expression error. What is the appropriate way to perform a left join while having a table with multiple primary keys?

In addition, I tried using one ON conditional and moving the other 3 conditionals to the WHERE clause like so:

CREATE STREAM TEST_FILTERED_STREAM WITH (
    KAFKA_TOPIC = 'example-topic4'
) AS SELECT
    T1.VALUE_AS_PRIMARY_KEY1,
    T1.VALUE_AS_PRIMARY_KEY2,
    T1.VALUE_AS_PRIMARY_KEY3,
    T1.VALUE_AS_PRIMARY_KEY4,
    T1.ORDINARY_VALUE1,
    T1.ORDINARY_VALUE2,
    T1.MY_TIMESTAMP_PARSED
FROM TEST_PARSED_TIMESTAMP_STREAM T1
LEFT JOIN TEST_MAX_TIME T2 ON (
    T1.VALUE_AS_PRIMARY_KEY1 = T2.VALUE_AS_PRIMARY_KEY1
)
WHERE T1.MY_TIMESTAMP_PARSED >= T2.MAX_TIMESTAMPAND AND
    T1.VALUE_AS_PRIMARY_KEY2 = T2.VALUE_AS_PRIMARY_KEY2 AND
    T1.VALUE_AS_PRIMARY_KEY3 = T2.VALUE_AS_PRIMARY_KEY3 AND
    T1.VALUE_AS_PRIMARY_KEY4 = T2.VALUE_AS_PRIMARY_KEY4
EMIT CHANGES;

But this also resulted in an error: Invalid join condition: stream-table joins require to join on the table's primary key .


Solution

  • This answer is based on the comment made by vkm80 on reddit.

    A proposed workaround is to derive a new column by concatenating primary key column values into a single column. This is how working code would look like:

    1. Setting offset to earliest
    SET 'auto.offset.reset' = 'earliest'
    
    1. Creating the initial stream
    CREATE STREAM TEST_INITIAL_STREAM (
        `VALUE_AS_PRIMARY_KEY1` STRING,
        `VALUE_AS_PRIMARY_KEY2` STRING,
        `VALUE_AS_PRIMARY_KEY3` STRING,
        `VALUE_AS_PRIMARY_KEY4` STRING,
        `ORDINARY_VALUE1` STRING,
        `ORDINARY_VALUE2` STRING,
        `MY_TIMESTAMP` STRING,
    ) WITH (
        KAFKA_TOPIC = 'example-topic1',
        VALUE_FORMAT = 'JSON',
        PARTITIONS = 3
    );
    
    1. Parsing the timestamp (Added COMBINED_KEYS Column)
    CREATE STREAM TEST_PARSED_TIMESTAMP_STREAM WITH (
        KAFKA_TOPIC = 'example-topic2'
    ) AS SELECT
        VALUE_AS_PRIMARY_KEY1,
        VALUE_AS_PRIMARY_KEY2,
        VALUE_AS_PRIMARY_KEY3,
        VALUE_AS_PRIMARY_KEY4,
        ORDINARY_VALUE1,
        ORDINARY_VALUE2,
        PARSE_TIMESTAMP(MY_TIMESTAMP, 'yyyy-MM-dd') AS MY_TIMESTAMP_PARSED
        CONCAT(VALUE_AS_PRIMARY_KEY1, '-', VALUE_AS_PRIMARY_KEY2, '-', VALUE_AS_PRIMARY_KEY3, '-', VALUE_AS_PRIMARY_KEY4) AS COMBINED_KEYS
    FROM TEST_INITIAL_STREAM EMIT CHANGES;
    
    1. Creating the table to hold the maximum timestamp of a unique combination of primary keys (group by COMBINED_KEYS)
    CREATE TABLE TEST_MAX_TIME WITH (
        KAFKA_TOPIC = 'example-topic3', FORMAT = 'JSON'
    ) AS SELECT
        COMBINED_KEYS,
        MAX(MY_TIMESTAMP_PARSED) AS MAX_TIMESTAMP
    FROM TEST_PARSED_TIMESTAMP_STREAM
    GROUP BY COMBINED_KEYS
    EMIT CHANGES;
    
    1. And finally, I created a filtered stream
    CREATE STREAM TEST_FILTERED_STREAM WITH (
        KAFKA_TOPIC = 'example-topic4'
    ) AS SELECT
        T1.VALUE_AS_PRIMARY_KEY1,
        T1.VALUE_AS_PRIMARY_KEY2,
        T1.VALUE_AS_PRIMARY_KEY3,
        T1.VALUE_AS_PRIMARY_KEY4,
        T1.ORDINARY_VALUE1,
        T1.ORDINARY_VALUE2,
        T1.MY_TIMESTAMP_PARSED,
        T1.COMBINED_KEYS
    FROM TEST_PARSED_TIMESTAMP_STREAM T1
    LEFT JOIN TEST_MAX_TIME T2 ON T1.COMBINED_KEYS = T2.COMBINED_KEYS
    WHERE T1.MY_TIMESTAMP_PARSED >= T2.MAX_TIMESTAMP
    EMIT CHANGES;
    

    That's it!