pythongoogle-cloud-dataflowapache-beamwindowing

Dynamically define the size of the apache beam windows


I am reading events from PubSub and the goal is to group them into windows. I would like to make the end of each window coincide with the minutes 0, 15, 30 and 45 of each hour.
Since this is a streaming job, it could be launched at any time, and I would like to find a way to align the size of the first window with the next ones.
This would be the stream:

  1. Launch the job
  2. Define as window_size the time remaining between this moment and the next quarter of an hour
  3. Starting from the end of this first window, set the window_size = int(15*60) (seconds).

For example:

  1. Launch the job
  2. Now it's 11:18, so fix window_size = (11:30-11:18).seconds
  3. When this first window will end, set window_size = int(15*60) (seconds)

In one of the examples provided by Google, the pipeline working with windowing is defined as follows, where window_size is a parameter passed as input by the user:

def expand(self, pcoll):
  return (
          pcoll
          | "Window into Fixed Intervals" >> beam.WindowInto(window.FixedWindows(self.window_size))
          | "Add Key" >> beam.Map(lambda elem: (None, elem))
          | "Groupby" >> beam.GroupByKey()
          | "Abandon Key" >> beam.MapTuple(lambda _, val: val)
  )

Solution

  • Your use case is a perfect fit for Beam!

    First, there is a basic conceptual issue to clear up:

    You will be more successful if you do not combine or confuse these two. Windows do not "start" or "end" as part of your job's processing time. Windows "exist" for all time.

    Using FixedWindows of 15 minutes will do just what you want. Every event will be associated with the 15 minute interval that it falls into. When you launch your job or when an event arrives for processing does not affect this.

    UPDATE: adding example to illustrate:

    Suppose you launch your job at 11:18 as in your question, and assume the incoming events are generated around the same time. Supposing the following events come in, with the timestamps indicated:

    The elements will be assigned to windows as follows:

    Note that the window assignment is unrelated to when you started your job, or when the event arrives, or the order of arrival. You could actually start it tomorrow, or re-run it on archived data, or on data that isn't even close to in order, and the result would be the same. Event time windowing is based on the data.