
FlinkSQL: access to kafka key

Let's say I want to use a kafka source for my Flink SQL... which is managed by aiven.

How can I get access to the key of the message??


I'm producing messages on the source topic with a bit of java, and the kafka messages look like this: (I'm using avro for the value of the message, a simple string for the key)

  key: "key12",               << string
  topic: "my_source_topic",
  value: { ... content ... }  << avro

Everything seems to be serialised properly. I can see the keys of the messages in the topic my_source_topic when using the aiven console for example.

I'm expecting flink to sink it into my other topic my_sink_topic... which works... but the keys are all null!

More details:

my source definition looks like this:

CREATE TABLE my_source (
    name STRING,
    the_kafka_key STRING 
) WITH (
    'connector' = 'kafka',
    'properties.bootstrap.servers' = '',
    'scan.startup.mode' = 'earliest-offset',
    'topic' = 'my_source_topic',
    'key.format' = 'raw',
    'key.fields' = 'the_kafka_key',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = '...', 
    'value.avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
    'value.avro-confluent.basic-auth.user-info' = '...'

my sink looks like this:

CREATE TABLE my_sink (
    name STRING,
    key STRING
) WITH (
    'connector' = 'kafka',
    'properties.bootstrap.servers' = '',
    'scan.startup.mode' = 'earliest-offset',
    'topic' = 'my_sink_topic',
    'key.format' = 'raw',
    'key.fields' = 'key',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = '...',
    'value.avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
    'value.avro-confluent.basic-auth.user-info' = '...'

my FlinkSQL statement (is that the correct word ?) looks like this:

    my_source.name AS name,
    my_source.the_kafka_key AS key
FROM my_source

When I look into my_source_topic: I can see the keys. When I look into my_sink_topic: I cannot see the keys! they are null! The message itself has been correctly decoded though...

What am I missing? What can I do check what is wrong ???


  • The code you provided have some naming issues. Probably just copy paste from different versions ? You doing mytopic.name AS name but your source table named my_source and INSERT INTO mysink when your table sink named my_sink.

    I think you need to add 'value.fields-include' = 'EXCEPT_KEY' to your table definitions. Because your key and value fields have different data types. More info here: Overlapping Format Fields


    # we sending 4 lines to topic TEST
    kfeed TEST --property "parse.key=true" --property "key.separator=:" <<<"usrid;itId:1;2;beh"
    kfeed TEST --property "parse.key=true" --property "key.separator=:" <<<"usrid;itId:1;2;beh"
    kfeed TEST --property "parse.key=true" --property "key.separator=:" <<<"3;2:1;2;beh"
    kfeed TEST --property "parse.key=true" --property "key.separator=:" <<<"abv;gd:1;2;beh"
    --creating source table
    CREATE TABLE KafkaTable (
      `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
      `user_id` STRING,
      `item_id` STRING,
      `user` BIGINT,
      `item` BIGINT,
      `behavior` STRING
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'TEST',
      'properties.bootstrap.servers' = 'localhost:9092',
      'properties.group.id' = 'test_key',
      'properties.auto.offset.reset' = 'earliest',
      'scan.startup.mode' = 'earliest-offset',
      'key.format' = 'csv',
      'key.csv.ignore-parse-errors' = 'true',
      'key.csv.field-delimiter' = ';',
      'key.fields' = 'user_id;item_id',
      'value.format' = 'csv',
      'value.csv.ignore-parse-errors' = 'true',
      'value.csv.field-delimiter' = ';',
      'value.fields-include' = 'ALL'

    Output 1

    | op |                      ts |                        user_id |                        item_id |                 user |                 item |                       behavior |
    | +I | 2024-10-03 15:54:35.668 |                              1 |                              2 |               (NULL) |               (NULL) |                         (NULL) |
    | +I | 2024-10-03 16:10:56.387 |                              1 |                              2 |               (NULL) |               (NULL) |                         (NULL) |
    | +I | 2024-10-03 16:12:14.891 |                              1 |                              2 |               (NULL) |               (NULL) |                         (NULL) |
    | +I | 2024-10-03 16:14:36.171 |                              1 |                              2 |               (NULL) |               (NULL) |                         (NULL) |

    Then we set 'value.fields-include' = 'EXCEPT_KEY'

    Output OK:

    # Key and Value both showed correctly
    | op |                      ts |                        user_id |                        item_id |                 user |                 item |                       behavior |
    | +I | 2024-10-03 15:54:35.668 |                          usrid |                           itId |                    1 |                    2 |                            beh |
    | +I | 2024-10-03 16:10:56.387 |                          usrid |                           itId |                    1 |                    2 |                            beh |
    | +I | 2024-10-03 16:12:14.891 |                              3 |                              2 |                    1 |                    2 |                            beh |
    | +I | 2024-10-03 16:14:36.171 |                            abv |                             gd |                    1 |                    2 |                            beh |