apache-kafkaapache-kafka-streams

Why does my Kafka Streams topology does not replay/reprocess correctly?


I have a topology that looks like this:

KTable<ByteString, User> users = topology.table(USERS);

KStream<ByteString, JoinRequest> joinRequests = topology.stream(JOIN_REQUESTS)
    .mapValues(entityTopologyProcessor::userNew)
    .to(USERS);

topology.stream(SETTINGS_CONFIRM_REQUESTS)
    .join(users, entityTopologyProcessor::userSettingsConfirm)
    .to(USERS);

topology.stream(SETTINGS_UPDATE_REQUESTS)
    .join(users, entityTopologyProcessor::userSettingsUpdate)
    .to(USERS);

At runtime this topology works fine. Users are created with join requests. They confirm their settings with settings confirm requests. They update their settings with settings update requests.

However, reprocessing this topology does not produce the original results. Specifically, the settings update joiner does not see the user that resulted from the settings confirm joiner, even though in terms of timestamps, many seconds elapse from the time the user is created, to the time the user is confirmed to the time the user updates their settings.

I'm at a loss. I've tried turning off caching/logging on the user table. No idea what to do to make this reprocess properly.


Solution

  • Update

    Over the years, a lot of effort was put into making Kafka Streams more deterministic.

    Kafka Streams 2.1 added max.task.idle.ms config to help "synchronizing" reading from different input topic (KIP-353) which was further improved in 3.0 release (KIP-695)

    For stream-table join in particular, versioned state stores (added in 3.5: KIP-889) can now we leveraged with a stream-table join "grace period" (added in 3.6: KIP-923) to provide deterministic and temporally accurate join results.

    Original Answer

    A KStream-KTable join is not 100% deterministic (and might never become 100% deterministic). We are aware of the problem and discuss solutions, to at least mitigate the issue.

    One problem is, that if a Consumer fetches from the brokers, we cannot control easily for which topics and/or partitions the broker returns data. And depending on the order in which we receive data from the broker, the result might slightly differ.

    One related issue: https://issues.apache.org/jira/browse/KAFKA-3514

    This blog post might help, too: https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/