javaapache-kafkaapache-flinkflink-streaming

Read a keyed Kafka Record using apache Flink?


I'm using a value + record Kafka producer using:

bin/kafka-console-producer.sh --topic test3 --property "parse.key=true" --property "key.separator=:" --bootstrap-server localhost:9092

But I'm finding it hard to understand how to read those Kafka records using Flink Kafka consumer KafkaSource. I want to be able to do things like:

record.getValue(), record.getKey(), record.getTimestamp()...

This is my current code that only reads non-keyed records from Kafka

KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers(ip)
        .setTopics("test3")
        .setGroupId("1")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
        .build();

DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
stream.map((MapFunction<String, String>) value -> "Receiving from Kafka : " + value).print();

Can I get an example of what I'm looking for?


Solution

  • You need to implement a KafkaRecordDeserializationSchema (but not valueOnly), and then in its deserialize method you'll have access to a ConsumerRecord, and you can work with its key, value, headers, etc to produce whatever type you want.

    There's an example in Reading Apache Kafka® headers , which is part of the Apache Flink Cookbook. Note that while that example accesses the topic, partition, offset, and timestamp from the record's headers, it doesn't use the key, which is available as record.key().

    Note: I work for Confluent.