apache-kafkastreamsets

How to transition a StreamSets pipeline to Finished state if the Origin does not 'Produce Events'?


I have created a StreamSets pipeline where the Origin is 'Kafka Consumer' and the destination is 'JDBC Producer'. To run this pipeline, I have created a StreamSets Job.

After I click on 'Start Job' to run the pipeline, the Job status turns to 'Active' and remains in 'Active' state indefinitely even after all the data from the origin Kafka Topic is consumed and processed by inserting in the destination database.

I am trying to get the StreamSets Job to 'InActive' state once it finishes processing all the data in the Kafka topic.

The StreamSets pipeline

For my other pipelines (which has the option to 'Produce Events' when no-more-data is there), I have used a 'Pipeline Finisher Executor'.

To transition this pipeline to Finished state, I have tried the following options one by one without success: 1. Set the 'Batch Wait Time' in 'Kafka Consumer' to a lower value. 2. Set the value as -1 for 'Runner Idle Time (sec)' in General tab of the pipeline. 3. Set the value for 'Pipeline Force Stop' timeout in the 'Job Status' tab of the StreamSets Job.

Please advice how I could take the pipeline to Finished state instead of streaming continuously.


Solution

  • Here's one way of handling it. Add Jython (or Groovy or JavaScript) Evaluator in your pipeline and use state object to track and check if it's the first or subsequent batch, and if there are any records in the batch. If it's not the first batch and there aren't any records in the batch, generate a custom event and send it to Pipeline Finisher.

    Note: You will need to play with Batch Wait Time (ms) and Max Batch Size (records) depending on your use case and how quickly messages are being produced, but provided you get that nailed, this will work.

    Init Script: state['first_batch'] = "true"

    Script:

    if (state['first_batch'] == "false" and len(records) == 0):
      sdc.log.info("No more Kafka messages to consume. Stopping pipeline. See ya!")
      sdc.toEvent(sdc.createEvent("no-more-messages", 0))
    
    for record in sdc.records:
      try:
        sdc.output.write(record)
      except Exception as e:
        # Send record to error
        sdc.error.write(record, str(e))
    
    if (state['first_batch'] == "true" and len(records) > 0):
      state['first_batch'] = "false"
    

    enter image description here

    enter image description here

    enter image description here

    Cheers, Dash