javaapache-flinkdata-streamstream-processing

Flink GlobalWindow Trigger only process the trigger event


I have datastream keyby by an event property, that is then passed to a globalwindow, trigged when a specific event comes in, the issue is that when the window is trigged to process the events, it only process the trigger event. Here is the code:

public class MyEvent {
    public String key;
    public String value;

    public MyEvent() {
        // Default constructor
    }

    public MyEvent(String key, String value) {
        this.key = key;
        this.value = value;
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "Key: " + key + ", Value: " + value;
    }
}




public class KeyedGlobalWindowTriggerExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<MyEvent> input = env.socketTextStream("localhost", 9091)
                .map(new MapFunction<String, MyEvent>() {
                    @Override
                    public MyEvent map(String value) {
                        // Assuming the input stream is in the format "key,value"
                        String[] parts = value.split(",");
                        return new MyEvent(parts[0], parts[1]);
                    }
                });

        // Key By Event Property
        KeyedStream<MyEvent, String> keyedStream = input
                .keyBy(event -> event.key);

        //Create a Custom Trigger
        keyedStream.window(GlobalWindows.create())
                .trigger(new Trigger<MyEvent, GlobalWindow>() {
                    @Override
                    public TriggerResult onElement(MyEvent event, long timestamp, GlobalWindow window, TriggerContext ctx) {
                        if ("eod".equals(event.getKey())) {
                            return TriggerResult.FIRE;
                        }
                        return TriggerResult.CONTINUE;
                    }

                    @Override
                    public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
                        return TriggerResult.CONTINUE;
                    }

                    @Override
                    public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
                        return TriggerResult.CONTINUE;
                    }

                    @Override
                    public void clear(GlobalWindow window, TriggerContext ctx) {
                        // Handle clearing of window state if necessary
                    }
                })
                .process(new MyProcessWindowFunction())
                .print();

        env.execute("Keyed Global Window Trigger Example");
    }
}

public class MyProcessWindowFunction extends ProcessWindowFunction<MyEvent, String, String, GlobalWindow> {
    @Override
    public void process(String key, Context context, Iterable<MyEvent> elements, Collector<String> out) {
        for (MyEvent element : elements) {
            out.collect(element.toString());
        }
    }
}

How to make sure it process all the events in the globalwindow, with a trigger event?

If I send the following events: test_one, one event_two, two event_three, three event_four, four eod, eod

The output is only: Key: eod, Events: [Key: eod, Value: eod, ]

I was expecting to see all the events that were sent to this window.


Solution

  • read this, the keyBy function split your event to different window, that is why only eod outputted. modify your code.

    1. remove KeyedStream<MyEvent, String> keyedStream = input.keyBy(event -> event.key);
    2. change keyedStream.window() to input.windowAll()
    3. change public class MyProcessWindowFunction extends ProcessWindowFunction<MyEvent, String, String, GlobalWindow> { @Override public void process(String key, Context context, Iterable<MyEvent> elements, Collector<String> out) { for (MyEvent element : elements) { out.collect(element.toString()); } } } to public static class MyProcessWindowFunction extends ProcessAllWindowFunction<MyEvent, String, GlobalWindow> { @Override public void process(ProcessAllWindowFunction<MyEvent, String, GlobalWindow>.Context context, Iterable<MyEvent> elements, Collector<String> out) throws Exception { for (MyEvent element : elements) { out.collect(element.toString()); } } }