AWS Kinesis seems to be made for use with Lambdas, with the concept of shards tied in with bandwidth and latency - but I'm trying to read it from a non-Lambda service, a long-running process which would ultimately call get_records()
in a loop.
But apparently there's some manual work needed to correctly read from all the shards, including handling resharding and checkpointing, which I think is under-documented. My question is: how to correctly read from all the shards in a Kinesis stream from a single long-running Python boto3 client and handle all the special steps?
AWS Kinesis seems to be made for use with Lambdas.
Kinesis predates Lambda by a year.
My question is: how to correctly read from all the shards in a Kinesis stream from a single long-running Python boto3 client and handle all the special steps?
The brief outline is this:
SequenceNumber
. It's a string that can be converted to a decimal number. It's guaranteed to be unique within a single shard but not across shards.Given all these constraints, your application should do this:
Get a list of existent shards in your stream by calling ListShards
.
Read the records from the shards by calling GetShardIterator
first and then GetRecords
in a sequential loop. The result of every GetRecords
is a batch of messages for the shard, the next iterator token, and, possible, the list of child shards, that is returned when the current shard is exhausted.
GetShardIterator
, use AFTER_SEQUENCE_NUMBER
and provide the checkpointed sequence number that you have last processed (see below).AT_SEQUENCE_NUMBER
with the start id returned by ListShards
.GetRecords
might return no records which is normal. If that's the case, you might want to introduce a delay before the next call.
Process the messages returned by GetRecords
. It's up to your application how to define the retry logic if some of the messages fail to process. Kinesis ideology assumes that message ordering is important, so you might process messages one by one, even if they are returned in a single batch. If a message cannot be processed after a number of retries, it might make sence to drop it and move on.
Once a message is processed, you should checkpoint your position in the shard. This is a fancy way of saying that you should write down the shard id and the sequence number of the last processed messages somewhere. It can be a database, a file on the disk, a key-value store or something like that. If your applications dies, the next time it restarts it will only start processing the shard from the last checkpointed position. You might want to do the checkpoints more rarely that once per message, as long as you're fine with double processing in the rare case that your application dies and restarts.
Once a shard is exhausted (which can only happen if it's been split or merged), meaning that you have processed its last record, you should record this fact in your checkpoint store, so that you can remove it from your processing list.
It's up to your application how to parallelize load between the shards, as long as you mind the constraints. If you have two shards 1 and 2 that had been merged and produced shard 3, 1 and 2 can be processed in parallel, but 3 can only be processed after 1 and 2. This is the hardest part in this business. You might want to take some inspiration for implementing it from here.
AWS has a product specifically designed for that, called KCL (Kinesis Client Library).
Unfortunately, it has a hard dependency on DynamoDB as a checkpoint store and can only natively integrate with Java code (altough it has a variety to process messages by calling child processes written in any language and communicate with them over standards process streams).