I am getting the following exception while subscribing to pub-sub Redis.
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
at io.netty.handler.codec.http.DefaultFullHttpRequest.release(DefaultFullHttpRequest.java:103)
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
at reactor.netty.http.server.HttpServerOperations.compression(HttpServerOperations.java:509)
at reactor.netty.http.server.HttpServerOperations.afterMarkSentHeaders(HttpServerOperations.java:580)
at reactor.netty.http.HttpOperations.lambda$then$3(HttpOperations.java:199)
at reactor.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:115)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:208)
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.onNext(ChannelSendOperator.java:192)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:128)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2057)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:846)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:608)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:465)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:341)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onComplete(FluxFlattenIterable.java:267)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
at reactor.core.publisher.MonoNext$NextSubscriber.onComplete(MonoNext.java:102)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:83)
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:358)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:113)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:100)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
at io.lettuce.core.RedisPublisher$ImmediateSubscriber.onNext(RedisPublisher.java:886)
at io.lettuce.core.RedisPublisher$RedisSubscription.onNext(RedisPublisher.java:291)
at io.lettuce.core.RedisPublisher$SubscriptionCommand.doOnComplete(RedisPublisher.java:773)
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:65)
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:63)
at io.lettuce.core.cluster.ClusterCommand.complete(ClusterCommand.java:65)
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:63)
at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:746)
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:681)
at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:598)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
The code I am using
reactiveMsgListenerContainer
.receive(weatherRedisTopic)
.onBackpressureBuffer(1000)
.map(ReactiveSubscription.Message<String, String>::getMessage)
.map { msg ->
objectMapper.deserialize<List<RealtimeweatherDto>>(msg)
}
.subscribe(
{ realtimeweather: List<RealtimeweatherDto> ->
val weatherData: Map<String, RealtimeweatherDto> =
realtimeweather.associateBy({ it.weatherer.symbol + ":" + it.weatherer.country }, { it })
sink.tryEmitNext(weatherData)
},
{ err ->
logger.error(err) { "Error occurred in ${weatherRedisTopic.topic} redis subscription channel" }
}
)
My hunch is the data I am getting in the pub-sub channel is too large there it is giving me this error. Any other suggestions?
How can I increase the code size.
I am using Spring webflux 2.5.1. There is spring.codec.max-in-memory-size
property should I increase it.
P.S:
It seems like this is more related to WebClient we are using. We have recently increased codec size
private val bufferedMemoryLimitInMb = 10 * 1024 * 1024
val webClient = WebClient.builder()
.codecs { configurer -> configurer.defaultCodecs().maxInMemorySize(bufferedMemoryLimitInMb ) }.build()
After migrating to Spring boot 2.5.8. this issue was resolved for me.