apache-kafkaapache-flinkflink-streamingstatefuldata-stream

How do I store a stream of data that is scarcely changing in Apache Flink


Essentially, I have a Flink DataStream which reads from a Kafka topic which rarely changes.

This topic holds records like userConsumerIdentifier

{
"user_id":1,
"consumer_id": 1
}

I have another stream consuming from another Kafka topic which processes hundreds of events per hour, with records like

userEvents

{
"user_id":1
"phone_number": "518-555-5555",
"zip": 11111
}

My goal is to essentially union or join the userConsumerIdentifier records with userEvents, so each userEvent may now have a consumer_id in in each stream record.

Where I'm getting confused is that userConsumerIdentifier Kafka topic is pretty static. It rarely has records written into it.

How can I pull in userConsumerIdentifer, I guess essentially store it for reference, and map it to my userEvents?

I looked in BroadcastState, but I'm not sure it's what I need specifically, and don't want to waste hours diving into a concept that isn't relevant to my use case


Solution

  • In general, I think a temporal join from the SQL/Table API is the best way to approach this sort of streaming enrichment use case. This will join each userEvent with the version of the corresponding userConsumerIdentifier that was in effect at the time of the userEvent. I recently made a short video explaining the ins and outs of this approach: https://www.youtube.com/watch?v=ChiAXgTuzaA

    However, an event time temporal join relies on watermarks, and this will be problematic in your case, where one of the topics is mostly idle. Idle sources don't produce watermarks, and without watermarks, the temporal join won't emit any results. Working around this problem is doable, but a bit fiddly. You could, for example, define a watermark strategy for the mostly idle table that always returns the equivalent of Long.MaxValue as the watermark, which will have the effect of always joining with the latest version of the userConsumerIdentifier.

    If the individual userConsumerIdentifier records are never updated, you could just do a regular inner join instead, and you'd only get one result for each userEvent, but this will be expensive because the runtime will store all of user events forever so that updated results can be produced should updates ever arrive (even if you know they won't).

    You can implement this from scratch instead, but there are a few things to consider:

    For an example showing how to implement a similar use case using the DataStream API and keyed state, see https://github.com/confluentinc/flink-cookbook/tree/master/enrichment-join-with-buffering.