apache-kafkaqueuedistributed-computingdistributed-systemfault-tolerance

Fault-tolerant queue-worker architecture in Kafka?


I am new to using queue-worker architectures and I'm interested in how to make it resilient to a worker failing. For example

  1. We have a pool of workers Alpha that put entries onto queue A
  2. Then the pool of workers Beta take those entries off A, do some work on them and then put them on queue B.

If a Beta worker takes an entry off of A but crashes before it writes it to B, how do we make sure the entry is not lost?

It seems that a Beta worker needs to be able to

  1. look at an entry from the queue and mark it as in-progress so another worker doesn't take it. But not remove it from the queue so if that worker fails, another worker can finish the job.
  2. mark the entry as done when the job is done

But as I understand modern queueing architectures often use tools like Kafka for the queue. Consuming an event seems to prevent it from being consumed by another consumer group so I don't see how a worker can view an event without removing it from A.

How is this done in Kafka? Is there an API call I haven't found for marking an event as being owned by a worker but not removing it from Kafka?


Solution

  • Ok I did a lot more digging and I found that Kafka has a concept of the position and the committed position.

    According to the docs

    The position of the consumer gives the offset of the next record that will be given out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances every time the consumer receives messages in a call to poll(Duration).

    whereas

    The committed position is the last offset that has been stored securely. Should the process fail and restart, this is the offset that the consumer will recover to. The consumer can either automatically commit offsets periodically; or it can choose to control this committed position manually by calling one of the commit APIs (e.g. commitSync and commitAsync).

    And it gives an example

     Properties props = new Properties();
     props.setProperty("bootstrap.servers", "localhost:9092");
     props.setProperty("group.id", "test");
     props.setProperty("enable.auto.commit", "false");
     props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     final int minBatchSize = 200;
     List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
         for (ConsumerRecord<String, String> record : records) {
             buffer.add(record);
         }
         if (buffer.size() >= minBatchSize) {
             insertIntoDb(buffer);
             consumer.commitSync();
             buffer.clear();
         }
     }
    

    The line insertIntoDb(buffer) is essentially adding to B in my original example.

    The docs explain this is for my exact use-case

    If we allowed offsets to auto commit as in the previous example, records would be considered consumed after they were returned to the user in poll. It would then be possible for our process to fail after batching the records, but before they had been inserted into the database.

    To avoid this, we will manually commit the offsets only after the corresponding records have been inserted into the database. This gives us exact control of when a record is considered consumed. This raises the opposite possibility: the process could fail in the interval after the insert into the database but before the commit (even though this would likely just be a few milliseconds, it is a possibility). In this case the process that took over consumption would consume from last committed offset and would repeat the insert of the last batch of data. Used in this way Kafka provides what is often called "at-least-once" delivery guarantees, as each record will likely be delivered one time but in failure cases could be duplicated.

    Nice.