I want to write a unit test, that ensures some retry mechanism is in place, and thus the code under test can handle a when the Publisher
emits an error the first time it is subscribed to.
Let's say I have a factory-style method that creates instances of MyObject
wrapped in a Mono
:
public Mono<Void> createMyObject() {
return myObjectFactory.createMyObject()
.doOnNext(myObject -> myObject.performActionOnObject())
.doOnError(throwable -> LOG.error("Error while creating MyObject!", throwable))
.retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(10))
.doBeforeRetry(retrySignal -> LOGGER.error("Retrying to create new MyObject due to {}, attempt nr. {}...", retrySignal.failure().getMessage(),
retrySignal.totalRetriesInARow() + 1)))
.then();
}
The createMyObject()
may return a Mono.error()
and thus a retryWhen
is used to make sure such errors are handled and the creation is retried in a couple of seconds.
I was trying to test this behaviour using reactor.test.publisher.TestPublisher
and StepVerifier
, that would model a scenario where first subscription would return an Error, but the second one would return a result.
For example:
@Test
void testRetries() {
TestPublisher<MyObject> myObjectTestPublisher = TestPublisher.create();
Mockito.when(myObjectFactory.createMyObject()).thenReturn(myObjectTestPublisher.mono());
MyObject myObjectMock = Mockito.mock(MyObject.class);
StepVerifier.withVirtualTime(() -> createMyObject())
.expectSubscription()
.then(() -> myObjectTestPublisher.assertWasSubscribed())
.then(() -> myObjectTestPublisher.error(new RuntimeException("Dummy error")))
.then(() -> myObjectTestPublisher.emit(myObjectMock))
.verifyTimeout(Duration.ofMinutes(10));
Mockito.verify(myObjectMock, Mockito.times(1)).performActionOnObject();
}
Yet whenever I run this test, the code gets stuck in an infinite number of retries, where the TestPublisher
always returns a Mono.error()
with RuntimeException("Dummy error")
.
So while the retry mechanism works, the TestPublisher
never emits the myObjectMock
, no matter how many retries are executed.
I also tried to rewrite the unit test to use a Sinks.one()
instead of the TestPublisher
, but I ended up with the same behaviour.
How can I tell the TestPublisher
to emit a Mono.error()
the first time it is subscribed to, and emit a mocked object the second time it is subscribed to (retried)?
Ah, so I found the trick.
The problem is, the TestPublisher will remove all its subscribers when emiting the error (a terminal signal).
Therefore, you have to create a TestPublisher with a CLEANUP_ON_TERMINATE
violation:
TestPublisher.createColdNonCompliant(false, TestPublisher.Violation.CLEANUP_ON_TERMINATE);
Furthermore, if you're using the StepVerifier
, you need to make the publisher cold, otherwise the emission of a value may be missed by the subscription triggered by the retry mechanism.