streamingapache-flinkflink-streamingstreaming-analytics

Accessing per-key state store in Apache Flink that changes dynamically


I have a stream of messages with different keys. For each key, I want to create an event time session window and do some processing on it only if:

For each key, MIN_EVENTS is different and might change during runtime. I am having difficulty implementing this. In particular, I am implementing this logic like so:

        inputStream.keyBy(key).
        window(EventTimeSessionWindow(INACTIVITY_PERIOD).
        trigger(new MyCustomCountTrigger()).
        apply(new MyProcessFn())

I am trying to create a custom MyCustomCountTrigger() that should be capable of reading from a state store such as MapState<String, Integer> stateStore that maps key to it's MIN_EVENTS parameter. I am aware that I can access a state store using the TriggerContext ctx object that is available to all Triggers.

How do I initialize this state store from outside the CountTrigger() class? I haven't been able to find examples to do so.


Solution

  • You can initialize the state based on parameters sent to the constructor of your Trigger class. But you can't access the state from outside that class.

    If you need more flexibility, I suggest you use a process function instead of a window.