apache-flinkflink-table-api

Headers with Flink Table Api Kafka Sink


How does one send kafka headers with the table api in Flink? I used the sql syntax to create a sink but not sure how to include custom headers.


Solution

  • What's currently possible is described here in the docs. The only writable metadata fields are the timestamp and the headers, where the headers are exposed as a map of strings to raw bytes.

    For an example, see testKafkaSourceSinkWithMetadata from the Flink sources.

    Excerpting from that example:

    CREATE TABLE kafka (
      ...,
      `headers` MAP<STRING, BYTES> METADATA
    ) WITH (
      'connector' = 'kafka',
      ...
    )
    
    
    INSERT INTO kafka
    VALUES
      (..., MAP['k1', X'C0FFEE', 'k2', X'BABE01']),
      (..., CAST(NULL AS MAP<STRING, BYTES>),
      (..., MAP['k1', X'102030', 'k2', X'203040'])