kotlinredisspring-data-redisredis-streams

Get pending messages with Redis Streams and Spring Data


I use Redis Streams in my Spring Boot application. Within a scheduler I regularly want to get all the pending messages and check how long they are already processing and re-trigger them if necessary.

My problem is now that I can get the pending messages, but I'm not sure how to get the payload.

My first approach used the pending and range operations. The downside here is that the totalDeliveryCount is not increased with range - so I cannot use the range method

val pendingMessages = stringRedisTemplate.opsForStream<String, Any>().pending(redisStreamName, Consumer.from(redisConsumerGroup, instanceName))
return pendingMessages.filter { pendingMessage ->
    if (pendingMessage.totalDeliveryCount < maxDeliveryAttempts && pendingMessage.elapsedTimeSinceLastDelivery > Duration.ofMillis(pendingTimeout.toLong())) {
            return@filter true
    } else {
        ...
        return@filter false
    }
}.map { //map from PendingMessage::class to a MapRecord with the content
    val map = stringRedisTemplate.opsForStream().range(redisStreamName, Range.just(it.idAsString)) // does not increase totalDeliveryCount !!!
    if (map != null && map.size > 0) { 
        return@map map[0]
    } else {
        return@map null
    }
}.filterNotNull().toList()

My second approach used the pending and read operations. For the read operation I can specify an offset with the current ID. The problem is that I only get IDs back which are higher then the one specified.

val pendingMessages = stringRedisTemplate.opsForStream().pending(redisStreamName, Consumer.from(redisConsumerGroup, instanceName))
return pendingMessages.filter { pendingMessage ->
    if (pendingMessage.totalDeliveryCount < maxDeliveryAttempts && pendingMessage.elapsedTimeSinceLastDelivery > Duration.ofMillis(pendingTimeout.toLong())) {
            return@filter true
    } else {
        ...
        return@filter false
    }
}.map { //map from PendingMessage::class to a MapRecord with the content
    val map = stringRedisTemplate.opsForStream<String, Any>()
            .read(it.consumer, StreamReadOptions.empty().count(1),
                    StreamOffset.create(redisStreamName, ReadOffset.from(it.id)))
    if (map != null && map.size > 0 && map[0].id.value == it.idAsString) { // map[0].id.value == it.idAsString does not match
        return@map map[0]
    } else {
        return@map null
    }
}.filterNotNull().toList()

So when I use ReadOffset.from('1234-0') I don't get the message with 1234-0 but everything after that message. Is there a way to get the exact message and also honoring the totalDeliveryCount and elapsedTimeSinceLastDelivery statistic?

I'm using spring-data-redis 2.3.1.RELEASE


Solution

  • I'm using following workaround now, which should be good for most cases:

    return if (id.sequence > 0) {
                "${id.timestamp}-${id.sequence - 1}"
            } else {
                "${id.timestamp - 1}-99999"
            }
    

    It relies on the fact that there are not more then 99999 messages inserted per ms.