google-cloud-dataflowapache-beamspotify-scio

How to limit PCollection in Apache Beam as soon as possible?


I'm using Apache Beam 2.28.0 on Google Cloud DataFlow (with Scio SDK). I have a large input PCollection (bounded) and I want to limit / sample it to a fixed number of elements, but I want to start the downstream processing as soon as possible.

Currently, when my input PCollection has e.g. 20M elements and I want to limit it to 1M by using https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/transforms/Sample.html#any-long-

input.apply(Sample.<String>any(1000000))

it waits until all of the 20M elements are read, which takes a long time.

How to efficiently limit number of elements to a fixed size and start downstream processing as soon as the limit is reached, discarding the rest of the input processing?


Solution

  • OK, so my initial solution for that is to use Stateful DoFn like this (I'm using Scio's Scala SDK as mentioned in the question):

    import java.lang.{Long => JLong}
    
    class MyLimitFn[T](limit: Long) extends DoFn[KV[String, T], KV[String, T]] {
      @StateId("count") private val count = StateSpecs.value[JLong]()
    
      @ProcessElement
      def processElement(context: DoFn[KV[String, T], KV[String, T]]#ProcessContext, @StateId("count") count: ValueState[JLong]): Unit = {
        val current = count.read()
        if(current < limit) {
          count.write(current + 1L)
          context.output(context.element())
        }
      }
    }
    

    The downside of this solution is that I need to synthetically add the same key (e.g. an empty string) to all elements before using it. So far, it's much faster than Sample.<>any().

    I still look forward to see better / more efficient solutions.