javaapache-kafkaapache-kafka-streams

Prevent Kafka Streams Consumer from writing offsets / wait for one stream to consume all records before starting the second stream


this is a 2in1 question regarding streams.

Im working on a service which consists of two streams. One (the first) should consume an entire topic, receiving key/value pairs and storing their information in a local HashMap

As soon as this stream has no lag anymore, the second stream starts and consumes another topic. From the consumed data and one of its attributes, it is decided based on the HashMap entries whether a record is dropped or further processed. Therefore the first stream needs to reach the end of its topic before the second stream starts. I would like to keep the service stateless and thus not save the data in a state store. This creates two problems:

  1. A normal Kafka consumer can be configured to prevent it from writing its offsets and thus consume the topic over and over each time it is restartet.
enable.auto.commit = false
auto.offset.reset = earliest

With streams however this does not seam to work. My temporary solution is to generate an ApplicationID with a random part in order to ignore previously written offsets. This generates a new consumer-group for each instance, resulting in many groups on the broker.

-> Is there a way to configure a streaming client to NOT write offsets?

  1. In order for the second stream to wait until the first topic is consumed i tried different implementations. My last approach looks as follows:
      try {
        firstStreams.start();
        // Waiting for consumer to start...
        while (computeLag(firstStreams.metrics().entrySet()) < 1) {
          sleep(100);
        }
        // Waiting for Lag to reach 0
        while (computeLag(firstStreams.metrics().entrySet()) > 1) {
          sleep(100);
        }

        secondStream.start();
        shutdownLatch.await();
      } catch (Throwable e) {
        System.exit(1);
      }
    }
    System.exit(0);
  }
  
   private static double computeLag(Set<? extends Map.Entry<MetricName, ? extends Metric>> metrics) {
    return metrics.stream()
        .filter(entries -> entries.getKey().name().equals("records-lag"))
        .map(entry -> entry.getValue().metricValue().toString())
        .map(Double::parseDouble)
        .mapToDouble(Double::doubleValue)
        .sum();
  }

The consumer metric will return 0.0 as long as the consumer is not running. After a successful start it returns the lag over all partitions.

This version works but it seems wrong and complicated.

-> is there a proper way to wait for a stream to reach "the end"?

Thanks in advance and best regards


Solution

  • Kafka Streams provides the solution to my problem via GlobalKTable.

    "The GlobalKTable is fully bootstrapped upon (re)start of a KafkaStreams instance, which means the table is fully populated with all the data in the underlying topic that is available at the time of the startup. The actual data processing begins only once the bootstrapping has completed."

    source: confluent streams developer-guide

    This worked perfectly for the last past months.