apache-flinkflink-streamingamazon-kinesis-analytics

Flink operation does not distribute the incoming messages equally to all subtasks


I have a Java Flink (version 1.15) application with an Async I/O operation running in AWS Kinesis Flink Runtime with parallelism set to 12. The operation reads a stream of messages from a FlinkKinesisConsumer and processes it in an async operation.

FlinkKinesisConsumer:

DataStream<> inputMessages =
    env.addSource(new FlinkKinesisConsumer<>(kinesisStream, new Deserializer(),  
                     streamSourceProperties));

Async operation:

DataStream<> processedStream =
            AsyncDataStream.unorderedWait(inputMessages, new AsyncIOFunction(environment), 1000,
                          TimeUnit.MILLISECONDS, 100);

In the Flink Dashboard, I see that the FlinkKinesisConsumer and the Async operation has 12 subtasks. However the data is only processed in on of the subtasks.

The following screenshots shows the records processed in one subtask, while the others have no load.

Kinesis Consumer:

enter image description here

Async I/O operation: enter image description here

Why is this distribution not equal? Will this have an impact on performance if the load is high? Also, how can I make the processing equally distributed among the subtasks?

I saw this question Equally distribute operators with single parallelism in a multi-parallel Flink application but I am still not sure of the answer for this.


Solution

  • It seems that your Kinesis configuration has only one shard, and therefore the FlinkKinesisSource reads it with only one subtask, even if the parallelism is greater than that.

    Please read this part in the documentation: enter image description here

    The best approach here is to have multiple shards (best setting would be as much as your parralelism count). If you can't do that, you can always use keyBy or random partitioning to shuffle the records to other subtasks after Flink fetchs it from Kinesis, but this approach is less efficient and has greater network load on the buffers.