I have a general idea of how it works but I just want to confirm what I've grokked from the docs is correct (particularly in a cluster).
4
task manager instances.keyBy(userId)
4
task managers such that each has its own respective key space that it is responsible for (key group). Say, 001-333
, 334-555
, 556-777
, 778-999
when the total userId
key space is 001-999
. I understand the hashing and key grouping is handled behind the scenes - these groups are just for the sake of clarify.Is this roughly correct? I'm a little confused as to how arbitrary keys can make it to their respective workers when each key must be handled by the same instance and slot for stateful handling.
Is the part that I'm missing here that sources, operators and sinks can scale independently? Meaning KafkaSource processes are responsible for hashing the key and routing it over to the correct downstream oeprator(s)? Local or network routed.
I guess I'm mentally modeling the function I deploy as if its a Lambda, which I guess it is not since the Job Manager is doing some magic behind the scenes to determine the job graph in the cluster?
The task managers each know the following information:
When a job is (re)started, each task manager uses this information to independently (and consistently) compute the range of key groups it will be handling. Checkpoints are indexed by key group, so each task manager can efficiently fetch and load their slice of the checkpoint.
Network shuffles (keyBy) are done by having each TM compute, for each record, first the key, then the key group, and then the index of the TM responsible for that key group.
Kafka partitioning is a completely separate concern -- Flink does not try to align its keying or partitioning (into key groups) to what Kafka is doing. Each instance of the Kafka source is aware of its task index and the total number of Kafka source tasks, as well as the set of topics and the number of partitions in each topic. They end up with a round-robin distribution of the partitions to the Kafka source tasks, with each instance independently computing which Kafka partitions it should be handling.
I hope that was clear. I think you had it basically right, but I tried to help by making it more concrete.