apache-flinkflink-streamingstream-processingevent-stream-processing

Does Flink throughput decrease proportionally with the number of side outputs?


I have a Flink job with multiple downstream operators I want to route tuples to based on a condition. Side outputs are advertised for this use case in the Flink documentation.

However, when sending data to side outputs using a ProcessFunction, throughput in the Flink Web UI decreases proportionally with the number of output channels. This happens when

When connecting all downstream operators to the upstream operator or just sending data to the main output channel, the UI does not show a decrease in throughput

Minimal reproducible example

Sending rate 10K rec/s, Watermark issued every second (based on event time), all downstream operators receive all records (i.e., no filters), 3 downstream operators. To illustrate the issue, no filters are used for routing to different channels.

Case 1: Upper most downstream operator (Map) uses main output channel or a side output of TumblingEventTimeWindows -> Process with main output channel idle, other 2 downstream operators use side output channels of TumblingEventTimeWindows -> Process. Downstream operators received records are 1/3 of TumblingEventTimeWindows -> Process records.

enter image description here

Case 2: Just send data to main output channel in ProcessFunction and do not send data to side outputs. Connect upper most downstream operator (Map) to main output channel. TumblingEventTimeWindows -> Process sent records are approximately equal to received records of upper most downstream operator.

enter image description here

Case 3: Directly connect all downstream operators (Map) to main output channel of TumblingEventTimeWindows -> Process. TumblingEventTimeWindows -> Process sent records are equal to received records of all downstream operators.

enter image description here


Is this behavior expected?


Solution

  • Answering my own question:

    When I look at the actual throughput metric (numRecordsInPerSecond) of the Map functions, it is similar for both case 1 and case 3. My mistake was to infer something about the throughput based on the metrics shown in the overview. However, the number of records that go out in the upstream operator increases with the number of side outputs. I wrongly assumed that each unique record is only counted once, even if it sent to multiple outputs (which actually does not make sense, because Flink cannot infer this easily).