redisredis-streams

How to define TTL for redis streams?


I have two micro services and I need to implement reliable notifications between them. I thought about using redis streams - serviceA will send a request to serviceB with an identifier X. Once serviceB is done doing the work serviceA asked for, it'll create/add to a stream (the stream is specific for X) a new item to let it know it's done.

ServiceA can send multiple requests, each request may have a different identifier. So it'll block for new elements in different streams.

My question is how can I delete streams which are no longer needed, depending on their age. For example I'd like to have streams that were created over a day ago deleted. Is this possible?

If it's not, I'd love to hear any ideas you have as to how not to have unneeded streams in redis.

Thanks


Solution

  • There's no straight forward way to delete older entries based on the TTL/age. You can use a combination of XTRIM/XDEL with other commands to trim the stream.

    Let's see how we can use XTRIM

    XTRIM stream MAXLEN ~ SIZE

    XTRIM trims the stream to a given number of items, evicting older items (items with lower IDs) if needed.

    You generate the stream size every day or periodically based on your delete policy and store it somewhere using XLEN command

    Run a periodic job that would call XTRIM as

    XTRIM x-stream MAXLEN ~ (NEW_SIZE - PREVIOUS_SIZE)

    For example, yesterday stream size was 500 now it's 600 then we need to delete 500 entries so we can just run

    XTRIM x-stream MAXLEN ~ 100

    You can use different policies for deletion for example daily, weekly, twice a week, etc.


    XDEL stream ID [ID...]

    Removes the specified entries from a stream, and returns the number of entries deleted, that may be different from the number of IDs passed to the command in case certain IDs do not exist.

    So what you can do is whenever Service B consumes the event than the service itself can delete the stream entry as service B knows the stream ID, but this will not work as soon as you start using the consumer group. So I would say use Redis set or Redis map to track the acknowledge stream ids and run a periodic sweep job to clean up the stream.

    For example

    Service A sends a stream item with ID1 to service B Service B acknowledges the stream item after consuming the items in the map ack_stream = { ID1: true }, you can track other data e.g. count in case of the consumer group.

    A sweep job would run at periodically like 1 AM daily that reads all the elements of ack_stream and filters out all items that require deletion. Now you can call XDEL commands in batch with the set of stream ids.