apache-flinkflink-streamingflink-cepflink-statefun

How to clear the whole MapSate state with only one call


I know that if I do mapState.clear() I will be able to clean all the values into the state for the specific key, but my question is: Is there a way to do something like mapState.clear() and clean all the states into the mapStates with just one call? will be something like mapState.isEmpty() it will say "true" because all the keys into the mapState were cleaned up, not just for the current key.

Thanks. Kind regards!


Solution

  • Because we are talking about a situation with nested maps, it's easy to get our terminology confused. So let's put this question into the context of an example.

    Suppose you have a stream of events about users, and inside a KeyedProcessFunction you are using a MapState<ATTR, VALUE> to maintain a map of attribute/value pairs for each user:

    userEvents
        .keyBy(e -> e.userId)
        .process(new ManageUserData())
    

    Inside the process function, any time you are working with MapState you can only manipulate the one map for the user corresponding to the event being processed,

    public static class ManageUserData extends KeyedProcessFunction<...> {
        MapState<ATTR, VALUE> userMap;
    }
    

    so userMap.clear() will clear the entire map of attribute/value pairs for one user, but leave the other maps alone.

    I believe you are asking if there's some way to clear all of the MapStates for all users at once. And yes, there is a way to do this, though it's a bit obscure and not entirely straightforward to implement.

    If you change the KeyedProcessFunction in this example to a KeyedBroadcastProcessFunction, and connect a broadcast stream to the stream of user events, then in that KeyedBroadcastProcessFunction you can use KeyedBroadcastProcessFunction.Context.html#applyToKeyedState inside of the processBroadcastElement() method to iterate over all of the users, and for each user, clear their MapState.

    You will have to arrange to send an event on the broadcast stream whenever you want this to happen.

    You should pay attention to the warnings in the documentation regarding working with broadcast state. And keep in mind that the logic implemented in processBroadcastElement() must have the same deterministic behavior across all parallel instances.