project-reactorspring-mongodbchangestreamspring-data-mongodb-reactive

Reactor flux throws illegalArgumentException - suspecting due to bufferTimeout


I have a spring application which builds a reactive pipeline as follows:

buildPipeline(). // returns a flux based on changeStreamEvents or Kafka receives
.bufferTimeout( capacity, Duration.ofSeconds(1))
. flatMap( r -> {
   element x = r.get(r.size()-1)
   //some processing on element and the batch obtained
})
.doOnError( e-> log.info("error occurred:" + e.toString())
.subscribe()

However, I see my application intermediately throwing the below error -

java.lang.illegalArgumentException:3.9 While the Subscription is not cancelled, Subscription.request(long n) MUST throw a java.lang.illegalArgumentException if argument <= 0
at com.mongodb.reactivestreams.client.internal.ObservableToPublisher$1
$1.request(ObservableToPublisher.java:43)
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:155)
at reactor.core.publisher.FluxBufferTimeout
$BufferimeoutSubscriber.requestMore(FluxBufferTimeout.java:317)

I'm not able to determine what is wrong, and why the stream is terminating with this error. Any help would be highly appreciated.

The application started throwing this error after I added "bufferTimeout" to add a feature of batching. Before that, I had never encountered this exception. Not sure how to replicate the issue as well, as it is not occurring locally or in UAT, but only in production environment of the application. Any leads would be helpful.

Thanks!


Solution

  • Try adding a onBackPressureBuffer(), so that in case of low demand this operator buffers the requests, and emits items in a controlled way.