How does one send kafka headers with the table api in Flink? I used the sql syntax to create a sink but not sure how to include custom headers.
What's currently possible is described here in the docs. The only writable metadata fields are the timestamp and the headers, where the headers are exposed as a map of strings to raw bytes.
For an example, see testKafkaSourceSinkWithMetadata from the Flink sources.
Excerpting from that example:
CREATE TABLE kafka (
...,
`headers` MAP<STRING, BYTES> METADATA
) WITH (
'connector' = 'kafka',
...
)
INSERT INTO kafka
VALUES
(..., MAP['k1', X'C0FFEE', 'k2', X'BABE01']),
(..., CAST(NULL AS MAP<STRING, BYTES>),
(..., MAP['k1', X'102030', 'k2', X'203040'])