I'm processing records from a Kinesis stream with the Python's boto3 client, and every documentation I've been able to find only reads from the first shard. The code usually looks like this:
response = self.kinesis_client.get_shard_iterator(
StreamName=self.name,
ShardId=self.details["Shards"][0]["ShardId"], # <-- this
ShardIteratorType="LATEST",
)
shard_iter = response["ShardIterator"]
while True:
response = self.kinesis_client.get_records(
ShardIterator=shard_iter, Limit=10
)
process_records(response["Records"])
try:
shard_iter = response["NextShardIterator"]
except KeyError:
break
But in my case, there are 15 shards in the stream, configured by someone else, and the goal is for this consumer to read ALL the messages in the stream. When I read just from the 1st shard, the get_records()
call returns a response without the NextShardIterator
so I'm guessing I have to pull from multiple shards. Is that assumption correct?
What would be the best practice here? Creating a thread for each of the shards and reading them in parallel? The processing is fast enough to process everything in that stream.
Is reading from multiple shards required to process all data in the AWS Kinesis stream?
TL;DR: yes.
Kinesis is a durable event streaming solution. The state of its events (whether they are processed or not) is separated from their data. The data is immutable, and the state is not a concern of Kinesis at all. This allows having multiple independent subscribers to each stream.
Once Kinesis has ingested your events, it doesn't know (or care) whether or not you have processed them, or if are you going process them again. A Kinesis stream stores the records for the duration of the stream's retention period (24 hours by default, but can be extended to up to a year). There's a pretty strong guarantee that once you have received a response that the record is there, it's there (i.e. can be eventually read within the retention period). The records have an intrinsic order: if a record B comes after A, all the readers will get B after A. You can even introduce strong ordering of the records on the client side by putting them into a causal relationship: in a loop, take the sequential number from the response for a previous record, and use it as SequenceNumberForOrdering
for the next one.
These kinds of guarantees come at a price (both monetary and computational), that's why Kinesis streams are broken into shards. Each shard has its own throughput limits, and you pay for every shard separately. The serialization guarantee also only applies to records within a single shard.
To balance between the cost and the throughput, Kinesis allows dynamically changing the number of active shards within a single stream without having to change your producer settings. As far as the producer is concerned, it just needs to send PutRecord
requests to a particular stream. The stream will distribute them among the shards automatically.
To do this seamlessly, without stopping the stream, while preserving the ordering, Kinesis supports resharding: it changes the number of shards by doing the operations known as shard splitting and shard merging. Both these operations make the stream stop writing to the old shard(s), mark them as closed, and start writing to the new shard(s). The fact of split or merge is also persisted, and all the records in the child shards are considered to have happened after all the records in the parent shards. It is particularly important for the shard merges: you should not start processing the records in the child shard until you have processed all the records in both parent shards.
Since Kinesis doesn't track the event status, it becomes the client responsibility: your processor should keep track of the progress ("checkpoints") through each shard, as well as of the splits and merges. It's not too hard, but not completely trivial either.
AWS provides a library called KCL (Kinesis Client Library) which does it for you. Unfortunately, it's only available in Java and has a hard dependency on DynamoDB as a store for the processing state. It may invoke subprocesses written in other languages.
What would be the best practice here? Creating a thread for each of the shards and reading them in parallel? The processing is fast enough to process everything in that stream.
That's one way to do that, however, you should mind splits and merges and rebalance your threads accordingly.
If you use KCL, it will balance the mappings ("leases") between shards and workers automatically.