javagoogle-cloud-platformgoogle-cloud-dataflowapache-beamapache-beam-io

Apache Beam batch application - timer callback not executing


I'm currently building a beam batch processing application using java apis. My source datasets are bounded and may or may not contain timestamps. my app is basically sourcing from bigquery tables and the flow goes like:

1 - read source table from BQ

2 - groups the data into chunks and prepare for google DLP api call - the API has limits per call, hence I used state to buffer the data and trigger the call to API as soon as a certain size of buffer is reached.

3 - for the remaining buffered data that didn't reach the threshold, a timer callback should be executed to flush out the remaining data.

however, the timer callback is not being executed. here's my example code below

@TimerId("stale")
private final TimerSpec staleSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

@StateId("buffer")
private final StateSpec<BagState<KV<Integer, TableRow>>> bufferedEvents = StateSpecs.bag();

@StateId("count")
private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();


@ProcessElement
public void process(ProcessContext context,
                    @StateId("buffer") BagState<KV<Integer, TableRow>> bufferState,
                    @StateId("count") ValueState<Integer> countState,
                    @TimerId("stale") Timer staleTimer) throws RuntimeException {

        if (firstNonNull(countState.read(), 0) == 0 ){
            staleTimer.offset(MAX_BUFFER_DURATION).setRelative();
        }

        int count = firstNonNull(countState.read(), 0);
        count = count + 1;
        countState.write(count);
        bufferState.add(context.element());
        int currentContentItemCount = count  * this.dlpTableHeader.get().size();

        if (currentContentItemCount >= (DLP_MAX_ITEM_PER_CALL - INTERNAL_BUFFER)){
               // execute call to cloud DLP
               // clear buffer and count etc
        }
}

@OnTimer("stale")
public void onStale(OnTimerContext context,
                    @StateId("buffer") BagState<KV<Integer, TableRow>> bufferState,
                    @StateId("count") ValueState<Integer> countState) throws IOException {
     if (Boolean.FALSE.equals(bufferState.isEmpty().read())){
      // call DLP api
      // clear buffer, count etc
     }

and the pipeline goes like:

pipeline
.apply("Read BQ source data", BQSourceReader.readSource(sourceSerDeUtil, tableSchema, dlpArgsOptions))
.apply("Generate Fake KV", ParDo.of(new GenerateFakeKV()))
.apply("Inspect Content of DLPTables from buffered TableRows",
                        ParDo.of(new DLPInspectContentTransform(ValueProvider.StaticValueProvider.of(dlpTableHeader))));

I'm not sure why the timer callback is not being executed, do I need to introduce a fixed window or put a dummy timestamp in each element? thank you in advance!


Solution

  • in the end, I've resolved this by explicitly defining a global window with triggers.

    Trigger subtrigger = AfterProcessingTime.pastFirstElementInPane();
    Trigger maintrigger = Repeatedly.forever(subtrigger);