apache-kafkaapache-flinksqlclient

Flink SQL-CLi: bring header records


I'm new with flink sql cli and I want to create a sink from my kafka cluster.

I've read the documentation and as I understand de headers are a map<STRING, BYTE> types and through them are all the important information.

When I'm using de sql-cli I try to create a sink table following this command:

CREATE TABLE KafkaSink (
`headers` MAP<STRING, BYTES> METADATA     
) WITH (
      'connector' = 'kafka',
      'topic' = 'MyTopic', 
      'properties.bootstrap.servers' ='LocalHost',
      'properties.group.id' = 'MyGroypID',
      'scan.startup.mode' = 'earliest-offset',
      'value.format' = 'json'
    );

But when I try to read the data with select * from KafkaSink limit 10; It returns me null records

enter image description here

I've tried to run queries like

select headers.col1 from a limit 10;

And also, I've tried to create the sink table with different structures at selecting columns part:

...
`headers` STRING
...
...
`headers` MAP<STRING, STRING>
...
...
`headers` ROW(COL1 VARCHAR, COL2 VARCHAR...)
...

But it returns me nothing, however when I bring the offset columns from kafka cluster it brings me the offset but no the headers.

Can someone explain me my error?

I want to create a kafka sink with flink sql cli


Solution

  • Ok, as I could see it, when I tried to change to

    'format' = 'debezium-json'
    

    I could see in a better way the json. I follow the json schema, in my case was

    {
    "data": {...},
    "metadata":{...}
    }
    

    So instead of bringing the header i'm bringing the data with all the columns that i need, the data as a string and the columns as for example data.col1, data.col2

    In order to see the records, just with a

    select 
          json_value(data, '$.Col1') as Col1 
    from Table;
    

    it works!