spring-webfluxspring-webclientreactor-nettylettuce

io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1


I am getting the following exception while subscribing to pub-sub Redis.

io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
    at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
    at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
    at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
    at io.netty.handler.codec.http.DefaultFullHttpRequest.release(DefaultFullHttpRequest.java:103)
    at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
    at reactor.netty.http.server.HttpServerOperations.compression(HttpServerOperations.java:509)
    at reactor.netty.http.server.HttpServerOperations.afterMarkSentHeaders(HttpServerOperations.java:580)
    at reactor.netty.http.HttpOperations.lambda$then$3(HttpOperations.java:199)
    at reactor.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:115)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255)
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
    at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:208)
    at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
    at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.onNext(ChannelSendOperator.java:192)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
    at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:128)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2057)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:846)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:608)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:465)
    at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:341)
    at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
    at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onComplete(FluxFlattenIterable.java:267)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    at reactor.core.publisher.MonoNext$NextSubscriber.onComplete(MonoNext.java:102)
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:83)
    at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:358)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:113)
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
    at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:100)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199)
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
    at io.lettuce.core.RedisPublisher$ImmediateSubscriber.onNext(RedisPublisher.java:886)
    at io.lettuce.core.RedisPublisher$RedisSubscription.onNext(RedisPublisher.java:291)
    at io.lettuce.core.RedisPublisher$SubscriptionCommand.doOnComplete(RedisPublisher.java:773)
    at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:65)
    at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:63)
    at io.lettuce.core.cluster.ClusterCommand.complete(ClusterCommand.java:65)
    at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:63)
    at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:746)
    at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:681)
    at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:598)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
    at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:829)


The code I am using

reactiveMsgListenerContainer
            .receive(weatherRedisTopic)
            .onBackpressureBuffer(1000)
            .map(ReactiveSubscription.Message<String, String>::getMessage)
            .map { msg ->
                objectMapper.deserialize<List<RealtimeweatherDto>>(msg)
            }
            .subscribe(
                { realtimeweather: List<RealtimeweatherDto> ->
                    val weatherData: Map<String, RealtimeweatherDto> =
                        realtimeweather.associateBy({ it.weatherer.symbol + ":" + it.weatherer.country }, { it })
                    sink.tryEmitNext(weatherData)
                },
                { err ->
                    logger.error(err) { "Error occurred in ${weatherRedisTopic.topic} redis subscription channel" }
                }
            )

My hunch is the data I am getting in the pub-sub channel is too large there it is giving me this error. Any other suggestions? How can I increase the code size. I am using Spring webflux 2.5.1. There is spring.codec.max-in-memory-size property should I increase it.

P.S:

It seems like this is more related to WebClient we are using. We have recently increased codec size

  private val bufferedMemoryLimitInMb = 10 * 1024 * 1024
    val webClient = WebClient.builder()
        .codecs { configurer -> configurer.defaultCodecs().maxInMemorySize(bufferedMemoryLimitInMb ) }.build()


Solution

  • After migrating to Spring boot 2.5.8. this issue was resolved for me.