javaaws-java-sdkamazon-kinesis-firehoseamazon-kcl

How do I use the requestShutdown and shutdown to do graceful shutdown in the case of KCL Java library for AWS Kinesis


I am trying to use the new feature of KCL library in Java for AWS Kinesis to do a graceful shutdown by registering with shutdown hook to stop all the record processors and then the worker gracefully. The new library provides a new interface which record processors needs to be implemented. But how does it get invoked?

Tried invoking first the worker.requestShutdown() then worker.shutdown() and it works. But is it any intended way to use it. What is the use then to use both, and its benefit?


Solution

  • Starting a consumer

    As you might know that when you create a Worker, it

    1) creates the consumer offset table in dynamodb

    2) create leases, schedule lease taker and lease renewer at configured interval of time

    If you have two partitions, then there will be two records in your same dynamodb table, meaning partition needs a lease.

    eg.

    {
      "checkpoint": "TRIM_HORIZON",
      "checkpointSubSequenceNumber": 0,
      "leaseCounter": 38,
      "leaseKey": "shardId-000000000000",
      "leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83",
      "ownerSwitchesSinceCheckpoint": 0
    }
    
    {
      "checkpoint": "49570828493343584144205257440727957974505808096533676050",
      "checkpointSubSequenceNumber": 0,
      "leaseCounter": 40,
      "leaseKey": "shardId-000000000001",
      "leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83",
      "ownerSwitchesSinceCheckpoint": 0
    }
    

    3) Then for each partition in the stream, Worker creates an internal PartitionConsumer, which actually fetches the events, and dispatches to your RecordProcessor#processRecords. see ProcessTask#call

    4) on your question, you have to register your IRecordProcessorFactory impl to the worker, which will give one ProcessorFactoryImpl to each PartitionConsumer.

    eg. see example here, which might be helpful

    KinesisClientLibConfiguration streamConfig = new KinesisClientLibConfiguration(
     "consumerName", "streamName", getAuthProfileCredentials(), "consumerName-" + "consumerInstanceId")
                .withKinesisClientConfig(getHttpConfiguration())
                .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); // "TRIM_HORIZON" = from the tip of the stream
    
    Worker consumerWorker = new Worker.Builder()
                .recordProcessorFactory(new DavidsEventProcessorFactory())
                .config(streamConfig)
                .dynamoDBClient(new DynamoDB(new AmazonDynamoDBClient(getAuthProfileCredentials(), getHttpConfiguration())))
                .build();
    
    
    public class DavidsEventProcessorFactory implements IRecordProcessorFactory {
    
        private Logger logger = LogManager.getLogger(DavidsEventProcessorFactory.class);
    
        @Override
        public IRecordProcessor createProcessor() {
            logger.info("Creating an EventProcessor.");
            return new DavidsEventPartitionProcessor();
        }
    }
    
    class DavidsEventPartitionProcessor implements IRecordProcessor {
    
        private Logger logger = LogManager.getLogger(DavidsEventPartitionProcessor.class);
    
        //TODO add consumername ?
    
        private String partitionId;
    
        private ShutdownReason RE_PARTITIONING = ShutdownReason.TERMINATE;
    
        public KinesisEventPartitionProcessor() {
        }
    
        @Override
        public void initialize(InitializationInput initializationInput) {
            this.partitionId = initializationInput.getShardId();
            logger.info("Initialised partition {} for streaming.", partitionId);
        }
    
        @Override
        public void processRecords(ProcessRecordsInput recordsInput) {
            recordsInput.getRecords().forEach(nativeEvent -> {
                String eventPayload = new String(nativeEvent.getData().array());
                logger.info("Processing an event {} : {}" , nativeEvent.getSequenceNumber(), eventPayload);
    
                //update offset after configured amount of retries
                try {
                    recordsInput.getCheckpointer().checkpoint();
                    logger.debug("Persisted the consumer offset to {} for partition {}",
                            nativeEvent.getSequenceNumber(), partitionId);
                } catch (InvalidStateException e) {
                    logger.error("Cannot update consumer offset to the DynamoDB table.", e);
                    e.printStackTrace();
                } catch (ShutdownException e) {
                    logger.error("Consumer Shutting down", e);
                    e.printStackTrace();
                }
            });
        }
    
        @Override
        public void shutdown(ShutdownInput shutdownReason) {
            logger.debug("Shutting down event processor for {}", partitionId);
    
            if(shutdownReason.getShutdownReason() == RE_PARTITIONING) {
                try {
                    shutdownReason.getCheckpointer().checkpoint();
                } catch (InvalidStateException e) {
                    logger.error("Cannot update consumer offset to the DynamoDB table.", e);
                    e.printStackTrace();
                } catch (ShutdownException e) {
                    logger.error("Consumer Shutting down", e);
                    e.printStackTrace();
                }
            }
        }
    
    }
    

    // then start a consumer

    consumerWorker.run();
    

    Stopping a consumer

    Now, when you want to stop your Consumer instance(Worker), you don't need to deal much with each PartitionConsumer, which will be taken care by Worker once you ask it to shut down.

    And more important thing with requestShutdown is if you want to get notified on your RecordProcessor then you can implement IShutdownNotificationAware as well. That way in case of race condition when your RecordProcessor is processing an event but worker is about to shut down, you should still be able to commit your offset and then shutdown.

    requestShutdown returns a ShutdownFuture, which then calls back worker.shutdown

    You will have to implement following method on your RecordProcessor to get notified on requestShutdown,

    class DavidsEventPartitionProcessor implements IRecordProcessor, IShutdownNotificationAware {
    
       private String partitionId;
    
       // few implementations
    
        @Override
        public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
            logger.debug("Shutdown requested for {}", partitionId);
        }
    
    }
    

    But but if you loose the lease before notifying then it might not be called.

    Summary to your questions

    The new library provides a new interface which record processors needs to be implemented. But how does it get invoked?

    Tried invoking first the worker.requestShutdown() then worker.shutdown() and it works. But is it any intended way to use it?

    You should use requestShutdown() for graceful shutdown, which will take care of race-condition. It was introduced in kinesis-client-1.7.1