apache-flinkflink-streaming

setGroupId not working in flink streaming job


I run Flink on my laptop with this command :

./bin/start-cluster.sh

and I am writing a simple Flink job to read data from one topic and produce the same data for another.

public class ThirdJob {
public static void main(String[] args) throws Exception {
    String kafkaTopicSink = "test10";
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
            .setBootstrapServers("localhost:9092")
            .setTopics("test9")
            .setGroupId("test160")
            .setClientIdPrefix("x_x_x_x")
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .build();



    DataStreamSource<String> myStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(),"read_from_kafka");

    DataStream<Person> objectStream= myStream.map(new PersonMapper()).name("Map_each_record_to_the_person_object");
    objectStream.print().name("Print_the_kafka_message");

    KafkaSink<Person> myKafkaSink = KafkaSink.<Person>builder()
            .setBootstrapServers("localhost:9092")
            .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                    .setTopic(kafkaTopicSink)
                    .setValueSerializationSchema(new PersonSerializationSchema())
                    .build()
            )
            .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
            .build();


    objectStream.sinkTo(myKafkaSink).name("write_data_to_the_kafka");

    env.execute();

}

}

the job works correctly but when I listed the consumer groups I didn't see my group ID in the list I am using this command to show the consumer groups list :

./kafka-consumer-groups  --list --bootstrap-server localhost:9092

Does Flink use Consumer Group? or do I need to do another configuration?


Solution

  • Do you have checkpointing enabled on the job?

    If not, you'll likely want it. Checkpointing is Flink's primary fault-tolerance mechanism and is a cornerstone for how some operations such as committing Kafka offsets functions. Flink will manage the appropriate offsets behinds the scenes and store those internally, as opposed to relying solely on Kafka consumer groups, etc. and send requests to commit them directly to Kafka during checkpointing.

    You can add this via the enableCheckpointing() function as seen below:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    // Enable checkpointing at some defined interval (30s)
    env.enableCheckpointing(30000);