I have datastream keyby by an event property, that is then passed to a globalwindow, trigged when a specific event comes in, the issue is that when the window is trigged to process the events, it only process the trigger event. Here is the code:
public class MyEvent {
public String key;
public String value;
public MyEvent() {
// Default constructor
}
public MyEvent(String key, String value) {
this.key = key;
this.value = value;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
@Override
public String toString() {
return "Key: " + key + ", Value: " + value;
}
}
public class KeyedGlobalWindowTriggerExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyEvent> input = env.socketTextStream("localhost", 9091)
.map(new MapFunction<String, MyEvent>() {
@Override
public MyEvent map(String value) {
// Assuming the input stream is in the format "key,value"
String[] parts = value.split(",");
return new MyEvent(parts[0], parts[1]);
}
});
// Key By Event Property
KeyedStream<MyEvent, String> keyedStream = input
.keyBy(event -> event.key);
//Create a Custom Trigger
keyedStream.window(GlobalWindows.create())
.trigger(new Trigger<MyEvent, GlobalWindow>() {
@Override
public TriggerResult onElement(MyEvent event, long timestamp, GlobalWindow window, TriggerContext ctx) {
if ("eod".equals(event.getKey())) {
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) {
// Handle clearing of window state if necessary
}
})
.process(new MyProcessWindowFunction())
.print();
env.execute("Keyed Global Window Trigger Example");
}
}
public class MyProcessWindowFunction extends ProcessWindowFunction<MyEvent, String, String, GlobalWindow> {
@Override
public void process(String key, Context context, Iterable<MyEvent> elements, Collector<String> out) {
for (MyEvent element : elements) {
out.collect(element.toString());
}
}
}
How to make sure it process all the events in the globalwindow, with a trigger event?
If I send the following events: test_one, one event_two, two event_three, three event_four, four eod, eod
The output is only: Key: eod, Events: [Key: eod, Value: eod, ]
I was expecting to see all the events that were sent to this window.
read this, the keyBy function split your event to different window, that is why only eod outputted. modify your code.
KeyedStream<MyEvent, String> keyedStream = input.keyBy(event -> event.key);
keyedStream.window()
to input.windowAll()
public class MyProcessWindowFunction extends ProcessWindowFunction<MyEvent, String, String, GlobalWindow> { @Override public void process(String key, Context context, Iterable<MyEvent> elements, Collector<String> out) { for (MyEvent element : elements) { out.collect(element.toString()); } } }
to public static class MyProcessWindowFunction extends ProcessAllWindowFunction<MyEvent, String, GlobalWindow> { @Override public void process(ProcessAllWindowFunction<MyEvent, String, GlobalWindow>.Context context, Iterable<MyEvent> elements, Collector<String> out) throws Exception { for (MyEvent element : elements) { out.collect(element.toString()); } } }