spring-bootmonoreactive-programmingspring-webfluxwebflux

Mono not executing with schedule


I have a Spring webflux app with the below method.

@Override
public Mono<Integer> updateSetting(int orgId, IntegrationDto dto,
        Map<String, Object> jsonMap) {
    return retrieveServices(dto.getClientId()).flatMap(services -> {
        jsonMap.put("service", services);
        return categoryRepository.findCategoryIdCountByName("test", orgId)
                .flatMap(categoryIdCount -> {
                    final ServiceDto serviceInput = new ServiceDto();
                    if (categoryIdCount == 0) {
                        return inventoryCategoryRepository.save(InventoryCategory.of("test", orgId))
                                .flatMap(category -> {
                                    return saveServices(serviceInput, orgId, jsonMap,
                                            category.getCategoryId());
                                });
                    } else {
                        // Some Logc here ...
                    }
                });
    }).onErrorResume(e -> {
        if (e instanceof WebClientResponseException) {
            int statusCode = ((WebClientResponseException) e).getRawStatusCode();
            throw new LabServiceException("Unable to connect to the service !", statusCode);
        }
        throw new ServiceException("Error connecting to the service !");
    });
}

private Mono<Services> retrieveServices(final String clientId) {
    return webClient.get().uri(props.getBaseUrl() + "/api/v1/services")
            .retrieve().bodyToMono(Services.class);
}

private Mono<Integer> saveInventories(ServiceInput serviceInput, int orgId, Map<String, Object> jsonMap,
        Long categoryId) {
    return refreshInventories(serviceInput, orgId, categoryId).flatMap(reponse -> {
        return updateSetting(branchId, jsonMap);
    });
}


private Mono<Integer> refreshInventories(ServiceInput serviceInput, int orgId, Long categoryId) {
    return inventoryRepository.findAllCodesByTypeBranchId(branchId).collectList().flatMap(codes -> {
        return retrieveAvailableServices(Optional.of(serviceInput), categoryId).flatMap(services -> {
            List<Inventory> inventories = services.stream()
                    .filter(inventory -> !codes.contains(inventory.getCode()))
                    .map(inventoryDto -> toInventory(inventoryDto, branchId)).collect(Collectors.toList());
            if (inventories.size() > 0) {
                return saveAllInventories(inventories).flatMap(response -> {
                    return Mono.just(orgId);
                });
            } else {
                return Mono.just(orgId);
            }
        });
    });
}

Here, the updateSettig public method is being invoked from a REST call and all gets executed as expected.

Now, I want to execute the same with a different flow as well like a scheduler.

When I invoke from a scheduler also, It works.

updateSetting(orgId, dto, jsonMap).subscribe();

But, I want to wait until the updateSetting gets executed.

So, tried with the code below.

updateSetting(orgId, dto, jsonMap).flatMap(response -> {
  ////
});

With the above code, updateSetting method gets invoked, but not getting into the retrieveServices.

return retrieveServices(dto.getClientId()).flatMap(services -> {

Solution

  • You always need to subscribe in the end. So your code should be:

    updateSetting(orgId, dto, jsonMap).flatMap(response -> {
      ////
    }).subscribe();