javaamazon-kinesisamazon-kcl

AWS Kinesis Enhanced fan-out Java example


I have an application that consumes records from Kinesis streams and processes them further but the performance is quite low, So now I am planning to migrate to the Kinesis Enhanced fan-out consumer using KCL 2.x to improve its performance. As the Aws Kinesis docs for the enhanced fan-out is quite confusing, can someone help me with an example of how I can implement this consumer feature in my Java application?


Solution

  • Here is a very detailed example of a KCL 2.x consumer: https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html

    The most important parts are:

    Scheduler scheduler = new Scheduler(
                    configsBuilder.checkpointConfig(),
                    configsBuilder.coordinatorConfig(),
                    configsBuilder.leaseManagementConfig(),
                    configsBuilder.lifecycleConfig(),
                    configsBuilder.metricsConfig(),
                    configsBuilder.processorConfig(),
                    configsBuilder.retrievalConfig()
            );
    

    The important part above is that the default retrievalConfig() is specified, which configures Enhanced fan-out consumer under the hood. The explicit way is the following:

    Scheduler scheduler = new Scheduler(
                    configsBuilder.checkpointConfig(),
                    configsBuilder.coordinatorConfig(),
                    configsBuilder.leaseManagementConfig(),
                    configsBuilder.lifecycleConfig(),
                    configsBuilder.metricsConfig(),
                    configsBuilder.processorConfig(),
                    configsBuilder.retrievalConfig()
                           .retrievalSpecificConfig(
                               new FanOutConfig(kinesisClient)
                                 .streamName(streamName)
                                 .applicationName(appName)
                               )
                           .maxListShardsRetryAttempts(maxListShardsRetryAttempts)
                           .initialPositionInStreamExtended(
           InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream)
                     )
            );