I have a stream of messages with different keys. For each key, I want to create an event time session window and do some processing on it only if:
MIN_EVENTS
number of events has been accumulated in the window (essentially a keyed state)For each key, MIN_EVENTS
is different and might change during runtime. I am having difficulty implementing this. In particular, I am implementing this logic like so:
inputStream.keyBy(key).
window(EventTimeSessionWindow(INACTIVITY_PERIOD).
trigger(new MyCustomCountTrigger()).
apply(new MyProcessFn())
I am trying to create a custom MyCustomCountTrigger()
that should be capable of reading from a state store such as MapState<String, Integer> stateStore
that maps key
to it's MIN_EVENTS
parameter. I am aware that I can access a state store using the TriggerContext ctx
object that is available to all Triggers.
How do I initialize this state store from outside the CountTrigger() class? I haven't been able to find examples to do so.
You can initialize the state based on parameters sent to the constructor of your Trigger class. But you can't access the state from outside that class.
If you need more flexibility, I suggest you use a process function instead of a window.