Let's say I want to use a kafka source for my Flink SQL... which is managed by aiven.
How can I get access to the key of the message??
Problem:
I'm producing messages on the source topic with a bit of java, and the kafka messages look like this: (I'm using avro
for the value
of the message, a simple string
for the key
)
{
key: "key12", << string
topic: "my_source_topic",
value: { ... content ... } << avro
}
Everything seems to be serialised properly. I can see the keys of the messages in the topic my_source_topic
when using the aiven console for example.
I'm expecting flink to sink it into my other topic my_sink_topic
... which works... but the keys are all null!
More details:
my source definition looks like this:
CREATE TABLE my_source (
name STRING,
the_kafka_key STRING
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '',
'scan.startup.mode' = 'earliest-offset',
'topic' = 'my_source_topic',
'key.format' = 'raw',
'key.fields' = 'the_kafka_key',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = '...',
'value.avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
'value.avro-confluent.basic-auth.user-info' = '...'
)
my sink looks like this:
CREATE TABLE my_sink (
name STRING,
key STRING
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '',
'scan.startup.mode' = 'earliest-offset',
'topic' = 'my_sink_topic',
'key.format' = 'raw',
'key.fields' = 'key',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = '...',
'value.avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
'value.avro-confluent.basic-auth.user-info' = '...'
)
my FlinkSQL statement (is that the correct word ?) looks like this:
INSERT INTO my_sink
SELECT
my_source.name AS name,
my_source.the_kafka_key AS key
FROM my_source
When I look into my_source_topic: I can see the keys. When I look into my_sink_topic: I cannot see the keys! they are null! The message itself has been correctly decoded though...
What am I missing? What can I do check what is wrong ???
The code you provided have some naming issues. Probably just copy paste from different versions ? You doing mytopic.name AS name
but your source table named my_source
and INSERT INTO mysink
when your table sink named my_sink.
I think you need to add 'value.fields-include' = 'EXCEPT_KEY'
to your table definitions. Because your key and value fields have different data types. More info here: Overlapping Format Fields
Example:
# we sending 4 lines to topic TEST
kfeed TEST --property "parse.key=true" --property "key.separator=:" <<<"usrid;itId:1;2;beh"
kfeed TEST --property "parse.key=true" --property "key.separator=:" <<<"usrid;itId:1;2;beh"
kfeed TEST --property "parse.key=true" --property "key.separator=:" <<<"3;2:1;2;beh"
kfeed TEST --property "parse.key=true" --property "key.separator=:" <<<"abv;gd:1;2;beh"
--creating source table
CREATE TABLE KafkaTable (
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
`user_id` STRING,
`item_id` STRING,
`user` BIGINT,
`item` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'TEST',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'test_key',
'properties.auto.offset.reset' = 'earliest',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'csv',
'key.csv.ignore-parse-errors' = 'true',
'key.csv.field-delimiter' = ';',
'key.fields' = 'user_id;item_id',
'value.format' = 'csv',
'value.csv.ignore-parse-errors' = 'true',
'value.csv.field-delimiter' = ';',
'value.fields-include' = 'ALL'
)
Output 1
+----+-------------------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
| op | ts | user_id | item_id | user | item | behavior |
+----+-------------------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
| +I | 2024-10-03 15:54:35.668 | 1 | 2 | (NULL) | (NULL) | (NULL) |
| +I | 2024-10-03 16:10:56.387 | 1 | 2 | (NULL) | (NULL) | (NULL) |
| +I | 2024-10-03 16:12:14.891 | 1 | 2 | (NULL) | (NULL) | (NULL) |
| +I | 2024-10-03 16:14:36.171 | 1 | 2 | (NULL) | (NULL) | (NULL) |
Then we set 'value.fields-include' = 'EXCEPT_KEY'
Output OK:
# Key and Value both showed correctly
+----+-------------------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
| op | ts | user_id | item_id | user | item | behavior |
+----+-------------------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
| +I | 2024-10-03 15:54:35.668 | usrid | itId | 1 | 2 | beh |
| +I | 2024-10-03 16:10:56.387 | usrid | itId | 1 | 2 | beh |
| +I | 2024-10-03 16:12:14.891 | 3 | 2 | 1 | 2 | beh |
| +I | 2024-10-03 16:14:36.171 | abv | gd | 1 | 2 | beh |