spring-bootspring-webfluxreactive-mongo-java

How do you throw exceptions within Webflux Mono and Flux streams to the caller?


I have a service that handles the insertion of a new record into a MongoDB collection:

public Mono<ProductDto> insertProduct(Mono<ProductDto> in) {
        //TODO Must handle Duplicate key inserts --> Throw a ProductAlreadyExistsException
        Mono<ProductDto> productDtoMono ;

        try{
            productDtoMono= in.map(ProductDto::toEntity)
                    .flatMap(productRepository::insert)
                    .map(ProductDto::new)
            ;
        }
        catch (DuplicateKeyException ex) {
            throw new ProductAlreadyExistsException();
        }

        return productDtoMono;
    }

When the ID given is already in use, the application throws a org.springframework.dao.DuplicateKeyException.

I am aware the above code with the try/catch block is incorrect, it is mostly there to demonstrate what I want to do. I am very new to Webflux, and reactive programming... I'd like to find out the correct way to handle this, but I have not been able to find much in the way of decent sample code for exception handling in the service layers for this, it is almost always in the router or request handler layer.

Hoping someone might be able to guide me on this.

The exception would be caught, and the application would throw the new, custom ProductAlreadyExistsException created for this purpose.

I have also tried to do this within the flatMap insert, but at this point I am kind of throwing poop at the wall to see if I can stumble into how it should be done:

public Mono<ProductDto> insertProduct(Mono<ProductDto> in) {
        //TODO Must handle Duplicate key inserts --> Throw a ProductAlreadyExistsException
        Mono<ProductDto> productDtoMono ;

            productDtoMono= in.map(ProductDto::toEntity)
                    .flatMap(p -> {
                        try{
                            return productRepository.insert(p);
                        }
                        catch (DuplicateKeyException ex) {
                            return Mono.error(new ProductAlreadyExistsException());
                        }
                    })
                    .map(ProductDto::new)
            ;

        return productDtoMono;
    }

Solution

  • Since DuplicateKeyException is an unchecked exception and not a checked exception (which are quite annoying to use in Reactive code), you can use the onErrorMap()-method here:

    public Mono<ProductDto> insertProduct(Mono<ProductDto> in) {
        return in.map(ProductDto::toEntity)
            .flatMap(productRepository::insert)
            .onErrorMap(DuplicateKeyException.class, e -> new ProductAlreadyExistsException())
            .map(ProductDto::new);
    }