Before sending an Avro GenericRecord to Kafka, a Header is inserted like so.
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topicName, key, message);
record.headers().add("schema", schema);
Consuming the record.
When using Spark Streaming, the header from the ConsumerRecord is intact.
KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, byte[]>Subscribe(topics, kafkaParams)).foreachRDD(rdd -> {
rdd.foreach(record -> {
System.out.println(new String(record.headers().headers("schema").iterator().next().value()));
But when using Spark SQL Streaming, the header seems to be missing.
StreamingQuery query = dataset.writeStream().foreach(new ForeachWriter<>() {
public void process(Row row) {
String topic = (String) row.get(2);
int partition = (int) row.get(3);
long offset = (long) row.get(4);
String key = new String((byte[]) row.get(0));
byte[] value = (byte[]) row.get(1);
ConsumerRecord<String, byte[]> record = new ConsumerRecord<String, byte[]>(topic, partition, offset, key,
//I need the schema to decode the Avro!
Where can I find the custom header value when using Spark SQL Streaming approach?
I tried 3.0.0-preview2 of spark-sql_2.12 and spark-sql-kafka-0-10_2.12. I added
.option("includeHeaders", true)
But I still only get these columns from the Row.
Kafka headers in Structured Streaming supported only from 3.0:
Please look for includeHeaders
for more details.