apache-flinkpartitioningactorflink-streamingflink-statefun

Flink stateful function address resolution for messaging


In Flink datastream suppose that an upstream operator is hosted on machine/task manager m, How does the upstream operator knows the machine (task manager) m’ on which the downstream operator is hosted. Is it during initial scheduling of the job sub/tasks (operators) by the JobManager that such data flow paths between downstream/upstream operators are established, and such data flow paths are fixed for the application lifetime?

More generally, consider Flink stateful functions where dynamic messaging is supported and data flow are not fixed or predefined, and given a function with key k that needs to send a message/event to a another function with key k’ how would function k finds the address of function k’ for messaging it? Does Flink runtime keeps key-machine mappings in some distributed data structure ( e.g, DHT as in Microsoft Orleans ) and every invocation of a function involves access to such data structure?

Note that I came from Spark background where given the RDD/batch model, job graph tasks are executed consecutively (broken at shuffle boundaries), and each shuffle subtasks are instructed of the machines holding the subset of keys that should be pulled/processed by that subtask….

Thank you.


Solution

  • Even with stateful functions, the topology of the underlying Flink job is fixed at the time the job is launched. Every stateful functions job uses a job graph more or less like this one (the ingresses vary, but the rest is always like this):

    enter image description here

    Here you see that all loaded ingresses become Flink source operators emitting the input messages, and routers become flatmap operators chained to those sources.

    The flatmaps acting as routers transform the input messages into internal event envelopes, which essentially just wrap the message payload with its destination logical address. Envelopes are the on-the-wire data type for all messages flowing through the stream graph. The Stateful Functions runtime is centered on a function dispatcher operator, which runs instances of all loaded functions across all modules.

    In between the router flatmap operator and the function dispatcher operator is a keyBy operation which re-partitions the input streams using the target destination id as the key. This network shuffle guarantees that all messages intended for a given id are sent to the same instance of the function dispatch operator.

    On receipt, the function dispatcher extracts the target function address from the envelope, loads that function instance, and then invokes the function with the wrapped input (which was also in the envelope).

    How do different instances of the function dispatcher send messages to each other?

    This is done by co-locating each function dispatcher with a feedback operator. All outgoing messages go through another network shuffle using the target function id as the key.

    This feedback operator creates a loop, or iteration, in the job graph. Stateful Functions can have cycles, or loops, in their messaging patterns, and are not limited to processing data with a DAG.

    The feedback channel is checkpointed; messages are never lost in the case of failure.

    For more on this, I recommend this Flink Forward talk by Tzu-Li (Gordon) Tai: Stateful Functions: Polyglot Event-Driven Functions for Stateful Distributed Applications. The figure above is from his talk.