I'm trying to use state as a cache for my Flink job. I will have a counter stored in the state for each key ( I'm using keyed stream) then I have to validate my event against this counter to decide if they are to be sent for further processing. As my state has a TTL, if I don't find the counter for any key in my state, I will have to query the DB to get the counter and update the state as well so that upcoming events can directly access it from state.
Now my main question is, What is the right way to query the external database(cassandra in my case) and write the fetched data back to my state ?
You've encountered a common situation in Flink where there's not a great solution (for me, great means it's efficient, simple, and reliable). You basically are trying to use state as a local cache for Cassandra. Some of your options are:
Just query Cassandra when the key doesn't exist. Obviously very simple, but not efficient if you have a lot of cases of state either not having been loaded, or expiring.
Don't bother using state as a cache, and leverage Flink's Async IO support for efficient reading from an external datasource like Cassandra. You can still use a local cache (like ehcache
, which can be configured to spill to disk) for what you read from Cassandra. The only real downside is that when the workflow restarts, you can't leverage the persisted cache.
Use a stateful CoProcessFunction, where you split the incoming stream. If you get a cache miss (not in state) then write to a side output that then uses an Async IO function to efficiently read from Cassandra. This output then gets unioned with the stream of cache hit results, and also has to be feed upstream to a sink that's also a source of these Cassandra records, which gets fed into the initial CoProcessFunction as records that are used to update your cached state. Which is obviously a lot more complicated.
Create a CDC (change data capture) stream from Cassandra, which is the "enrichment" stream into a CoProcessFunction. So no external queries are needed, but you do wind up having to save in state the entire Cassandra table. This is perhaps the most efficient and "modern" way to handle this issue, but it does require extra work on the ops side re configuring Cassandra properly.