apache-flinkflink-batch

Batch elements in Apache Flink


I have a stream of IDs in Apache Flink. I would like to batch them into sets of 500 and for each batch call an external service that will give me additional data for each ID. Then I want to forward each ID with the additional data further downstream. I'm using batching here for performance reasons because 1 request with 500 IDs is much faster than 500 requests with 1 ID.

I tried implementing this using windows, but I'm either getting tiny batches or no batches at all. In runtime execution mode BATCH I'm also losing the last remaining IDs.

Ideally I would like to:

I'm a bit lost with the DataSet API, which functions should I use and how can I structure the program?


Solution

  • With the (recommended) DataStream API, and the goal of having a scalable, reliable workflow, one approach is the following:

    1. In a map function, convert your incoming record to a Tuple2<key, record>. The key would be an Integer hash calculated from one or more stable fields in the incoming record. By "stable" I mean they wouldn't change if you re-ran the workflow on the same data, thus it wouldn't be (say) a field where you put the ingest time.
    2. Key the stream by the Tuple2.f0 (first field).
    3. Implement a KeyedProcessFunction. This would save incoming records in ListState (and also register a timer set to MAX_WATERMARK). When you had 500 records, or the timer fired (which would happen when all of the incoming data had been received), then you'd output a record containing the batch of incoming records.
    4. Follow that by a RichAsyncFunction, where you call the remote service with the batch of records, and use the response to enrich (and then emit) the records.