apache-flinkflink-streaming

Connect a stream with watermarks with another one without watermarks in Flink


I have stream A and stream B in Flink.

I want to connect stream A and stream B (enrich A with B), with the resulting connection having the watermarks of A

since Flink takes the min between the two, I thought about two approaches:

  1. emit Max watermarks, and collect max timestamps from B
  2. mark B as temporary idle after each emission

I don't know which approach is correct, or is there a better way to handle this scenario


Solution

  • As David noted, you can use Async I/O (via Flink's Java API) or the HTTP TableLookup connector (via Flink's Table API).

    If your API doesn't work with the HTTP TableLookup connector, and you want to cache in state your past lookups, then it can make sense to use your approach of having this be a source. In that situation, and if there isn't anything that provides "event time" in the records you're fetching via the API, then I would use processing time as your event time for each record.

    Note that often these situations require a time-based join. E.g. if Stream A was transactions, and Stream B had currency conversion data, then you'd want to be able to find the "best" conversion rate (timestamp at/after but closest to the transaction timestamp), which is much more complicated than a simple join. Here it could make sense to switch to using Flink's table/SQL support for this.