javapartitioningflink-streamingskew

What are the other options to handle skew data in Flink?


I am studying data skew processing in Flink and how I can change the low-level control of physical partition in order to have an even processing of tuples. I have created synthetic skewed data sources and I aim to process (aggregate) them over a window. Here is the complete code.

streamTrainsStation01.union(streamTrainsStation02)
        .union(streamTicketsStation01).union(streamTicketsStation02)
        // map the keys
        .map(new StationPlatformMapper(metricMapper)).name(metricMapper)
        .rebalance() // or .rescale() .shuffle()
        .keyBy(new StationPlatformKeySelector())
        .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
        .apply(new StationPlatformRichWindowFunction(metricWindowFunction)).name(metricWindowFunction)
        .setParallelism(4)
        .map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
        .addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction)
        ;

According to the Flink dashboard I could not see too much difference among .shuffle(), .rescale(), and .rebalance(). Even though the documentation says rebalance() transformation is more suitable for data skew.

After that I tried to use .partitionCustom(partitioner, "someKey"). However, for my surprise, I could not use setParallelism(4) on the window operation. The documentation says

Note: This operation is inherently non-parallel since all elements have to pass through the same operator instance.

I did not understand why. If I am allowed to do partitionCustom, why can't I use parallelism after that? Here is the complete code.

streamTrainsStation01.union(streamTrainsStation02)
        .union(streamTicketsStation01).union(streamTicketsStation02)
        // map the keys
        .map(new StationPlatformMapper(metricMapper)).name(metricMapper)
        .partitionCustom(new StationPlatformKeyCustomPartitioner(), new StationPlatformKeySelector())
        .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
        .apply(new StationPlatformRichAllWindowFunction(metricWindowFunction)).name(metricWindowFunction)
        .map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
        .addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction)
        ;

Thanks, Felipe


Solution

  • I got an answer from FLink-user-mail list. Basically using keyBy() after rebalance() is killing all effect that rebalance() is trying to do. The first (ad-hoc) solution that I found is to create a composite key that cares about the skewed key.

    public class CompositeSkewedKeyStationPlatform implements Serializable {
        private static final long serialVersionUID = -5960601544505897824L;
        private Integer stationId;
        private Integer platformId;
        private Integer skewParameter;
    }
    

    I use it on the map function before use keyBy().

    public class StationPlatformSkewedKeyMapper
            extends RichMapFunction<MqttSensor, Tuple2<CompositeSkewedKeyStationPlatform, MqttSensor>> {
        private SkewParameterGenerator skewParameterGenerator;
    
        public StationPlatformSkewedKeyMapper() {
            this.skewParameterGenerator = new SkewParameterGenerator(10);
        }
    
        @Override
        public Tuple2<CompositeSkewedKeyStationPlatform, MqttSensor> map(MqttSensor value) throws Exception {
            Integer platformId = value.getKey().f2;
            Integer stationId = value.getKey().f4;
            Integer skewParameter = 0;
    
            if (stationId.equals(new Integer(2)) && platformId.equals(new Integer(3))) {
                skewParameter = this.skewParameterGenerator.getNextItem();
            }
            CompositeSkewedKeyStationPlatform compositeKey = new CompositeSkewedKeyStationPlatform(stationId, platformId,
                    skewParameter);
            return Tuple2.of(compositeKey, value);
        }
    }
    

    here is my complete solution.