google-cloud-dataflowapache-beambigtable

run & skip steps based on condition in apache beam pipeline which reads data from multiple tables


  1. I read data from BigTable using pipeline.apply
  2. Using data from 1 as a side input, I again read from another BigTable using pipeline.apply
  3. Finally, after some other steps in the pipeline, I run this step to update the status of data fetched in (1)

How can I skip (2) when there is no data returned in (1)?

Step 3 gets completed before step (2). How do I make step 3 dependennt on step 2

(1)
PCollection<trigger> readyOrInProgressTriggers =
      pipeline.apply(BigtableHelper.getBigtableIORead("trgr_sta"))
        .apply(TriggerStatus.bigTableRowToPojo())
        .apply(Filter.by(new FilterreadyInProgressTriggerFn()))
        .apply(ParDo.of(new TriggerStatus.updateTriggerStatus(Constant.IN_PROGRESS_STATUS)));

(2)

    PCollectionView<List<Event>> srcAggrEvents =
      pipeline.apply(BigtableHelper.getBigtableIORead("event"))
        .apply(
          ParDo.of(new FilterSrcAggrEventRowsFn(triggerList))
            .withSideInput("triggerList", triggerList))
        .apply(Event.bigTableRowToPojo())
        .apply(View.asList());

(3)

    readyOrInProgressTriggers.apply("update status", ParDo.of(
        new TriggerStatus.updateTriggerStatus(BNCConstant.COMPLETE_STATUS)))
      .apply(ParDo.of(new TriggerStatus.pojoToMutation()))
      .apply(BigtableHelper.writeToBigtable("trgr_sta"));


Solution

  • How do I make step 3 dependent on step 2

    Wait.on is another option that can be used to wait on the processing of another step.
    

    How can I skip (2) when there is no data returned in (1)?

    Please refer comment from XQ Hu . By passing readyOrInProgressTriggers as a side input, depending on the count of that side input, thelogic in step 2 can be skipped