javaproject-reactorreactive-streams

How to make Reactor TestPublisher return Error for the first subscription, but emit value for the next one to test Retries?


I want to write a unit test, that ensures some retry mechanism is in place, and thus the code under test can handle a when the Publisher emits an error the first time it is subscribed to.

Let's say I have a factory-style method that creates instances of MyObject wrapped in a Mono:

public Mono<Void> createMyObject() {
    return myObjectFactory.createMyObject()
            .doOnNext(myObject -> myObject.performActionOnObject())
            .doOnError(throwable -> LOG.error("Error while creating MyObject!", throwable))
            .retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(10))
                    .doBeforeRetry(retrySignal -> LOGGER.error("Retrying to create new MyObject due to {}, attempt nr. {}...", retrySignal.failure().getMessage(),
                            retrySignal.totalRetriesInARow() + 1)))
            .then();
}

The createMyObject() may return a Mono.error() and thus a retryWhen is used to make sure such errors are handled and the creation is retried in a couple of seconds.

I was trying to test this behaviour using reactor.test.publisher.TestPublisher and StepVerifier, that would model a scenario where first subscription would return an Error, but the second one would return a result.

For example:

@Test
void testRetries() {
    TestPublisher<MyObject> myObjectTestPublisher = TestPublisher.create();
    Mockito.when(myObjectFactory.createMyObject()).thenReturn(myObjectTestPublisher.mono());

    MyObject myObjectMock = Mockito.mock(MyObject.class);

    StepVerifier.withVirtualTime(() -> createMyObject())
            .expectSubscription()
            .then(() -> myObjectTestPublisher.assertWasSubscribed())
            .then(() -> myObjectTestPublisher.error(new RuntimeException("Dummy error")))
            .then(() -> myObjectTestPublisher.emit(myObjectMock))
            .verifyTimeout(Duration.ofMinutes(10));

    Mockito.verify(myObjectMock, Mockito.times(1)).performActionOnObject();
}

Yet whenever I run this test, the code gets stuck in an infinite number of retries, where the TestPublisher always returns a Mono.error() with RuntimeException("Dummy error").

So while the retry mechanism works, the TestPublisher never emits the myObjectMock, no matter how many retries are executed.

I also tried to rewrite the unit test to use a Sinks.one() instead of the TestPublisher, but I ended up with the same behaviour.

TLDR

How can I tell the TestPublisher to emit a Mono.error() the first time it is subscribed to, and emit a mocked object the second time it is subscribed to (retried)?


Solution

  • Ah, so I found the trick.

    The problem is, the TestPublisher will remove all its subscribers when emiting the error (a terminal signal).

    Therefore, you have to create a TestPublisher with a CLEANUP_ON_TERMINATE violation:

    TestPublisher.createColdNonCompliant(false, TestPublisher.Violation.CLEANUP_ON_TERMINATE);
    

    Furthermore, if you're using the StepVerifier, you need to make the publisher cold, otherwise the emission of a value may be missed by the subscription triggered by the retry mechanism.