javareactorflatmap

simple for loop java reactor post flatmap


My problem is that I do a post request to get the total number of elements in my db and I need to do a for loop until I reach that number integer division 10.

My current not working code

protected Mono<List<Long>> getAllSubscriptionIds(ProductCode productCode) {
    List<Long> subscriptionIds = new ArrayList<>();

    String body = "{\n" +
            " \"productCodes\": [\"" + productCode.name() + "\"],\n" +
            " \"pagination\": {\n" +
            "     \"offset\": 0,\n" +
            "     \"limit\": 10" +
            "\n  }\n" +
            " }";
    //first post where I get the number of elements in my db
    return restClient.post(
                    "https://" + url,
                    buildRequiredHeaders(),
                    body,
                    String.class
            )
            .onErrorMap(err-> new RuntimeException(err.getMessage()))
            .flatMap(response -> {
                log.debug(response);
                ResponseModel<DataLakeCallResponse<JsonNode>> variable = null;
                try {
                    variable = JsonUtil.fromString(response, new TypeReference<ResponseModel<DataLakeCallResponse<JsonNode>>>() {
                    });
                    log.debug(response);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
                variable.getPayload().getList().forEach(
                        object-> subscriptionIds.add(object.get("subscriptionId").asLong()));
                //if number of elements > 10
                if(variable.getPayload().getPagination().getResultCount() > 10){
                    //for loop on that number / 10 (so that I can use an offset
                    for (int i = 0; i < variable.getPayload().getPagination().getResultCount() / 10; i++){
                        String bodyI = "{\n" +
                                " \"productCodes\": [\"" + productCode.name() + "\"],\n" +
                                " \"pagination\": {\n" +
                                "     \"offset\": " + (i + 1) * 10 + ",\n" +
                                "     \"limit\": 10\n" +
                                "  }\n" +
                                " }";
                        return restClient.post(
                                        "https://" + url,
                                        buildRequiredHeaders(),
                                        bodyI,
                                        String.class
                                )
                                .onErrorMap(err-> new RuntimeException(err.getMessage()))
                                .flatMap(resp -> {
                                    ResponseModel<DataLakeCallResponse<JsonNode>> varia = null;
                                    try {
                                        varia = JsonUtil.fromString(resp, new TypeReference<ResponseModel<DataLakeCallResponse<JsonNode>>>() {
                                        });
                                    } catch (IOException e) {
                                        throw new RuntimeException(e);
                                    }
                                    varia.getPayload().getList().forEach(
                                            object-> subscriptionIds.add(object.get("subscriptionId").asLong()));

                                    return Mono.just(subscriptionIds);
                                });
                    }
                }
                return Mono.just(subscriptionIds);
            });
}

I do understand why this does not work (it return inside the for loop) but I don't really understand what alternative can I use to make it work. I tried an external method but it will still fail. I tried a Mono.zip but I think I tried it wrong.

This is an alternative that I tried but it still does not work.

protected Mono<Object> getAllSubscriptionIds(ProductCode productCode) {
this.counter = 0;
List<Long> subscriptionIds = new ArrayList<>();
List<Mono<Integer>> resultList = new ArrayList<>();

String body = "{\n" +
        " \"productCodes\": [\"" + productCode.name() + "\"],\n" +
        " \"pagination\": {\n" +
        "     \"offset\": 0,\n" +
        "     \"limit\": 10" +
        "\n  }\n" +
        " }";

return restClient.post(
                "https://" + url,
                buildRequiredHeaders(),
                body,
                String.class
        )
        .onErrorMap(err-> new RuntimeException(err.getMessage()))
        .flatMap(response -> {
            log.debug(response);
            ResponseModel<DataLakeCallResponse<JsonNode>> variable = null;
            try {
                variable = JsonUtil.fromString(response, new TypeReference<ResponseModel<DataLakeCallResponse<JsonNode>>>() {
                });
                log.debug(response);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
            variable.getPayload().getList().forEach(
                    object-> subscriptionIds.add(object.get("subscriptionId").asLong()));

            if(variable.getPayload().getPagination().getResultCount() > 10){
                for (int i = 0; i < variable.getPayload().getPagination().getResultCount() / 10; i++){
                    resultList.add(Mono.just(i));
                }
            }

            return Mono.zip(resultList, intMono -> {
                this.counter++;
                String bodyI = "{\n" +
                        " \"productCodes\": [\"" + productCode.name() + "\"],\n" +
                        " \"pagination\": {\n" +
                        "     \"offset\": " + this.counter * 10 + ",\n" +
                        "     \"limit\": 10\n" +
                        "  }\n" +
                        " }";
                return restClient.post(
                                "https://" + url,
                                buildRequiredHeaders(),
                                bodyI,
                                String.class
                        )
                        .onErrorMap(err-> new RuntimeException(err.getMessage()))
                        .flatMap(resp -> {
                            ResponseModel<DataLakeCallResponse<JsonNode>> varia = null;
                            try {
                                varia = JsonUtil.fromString(resp, new TypeReference<ResponseModel<DataLakeCallResponse<JsonNode>>>() {
                                });
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                            varia.getPayload().getList().forEach(
                                    object-> subscriptionIds.add(object.get("subscriptionId").asLong()));

                            return Mono.just(subscriptionIds);
                        });
            });
           // return Mono.just(subscriptionIds);
        });
}

Any idea how to solve this?


Solution

  • Ok finally I got a solution

    protected Flux<Object> getAllSubscriptionIds(ProductCode productCode) {
        List<Long> subscriptionIds = new ArrayList<>();
        AtomicInteger i = new AtomicInteger();
    
        String body = "{\n" +
                " \"productCodes\": [\"" + productCode.name() + "\"],\n" +
                " \"pagination\": {\n" +
                "     \"offset\": 0,\n" +
                "     \"limit\": 1000" +
                "\n  }\n" +
                " }";
    
        return restClient.post(
                        "https://" + url,
                        buildRequiredHeaders(),
                        body,
                        String.class
                )
                .onErrorMap(err-> new RuntimeException(err.getMessage()))
                .flatMapMany(response -> {
                    log.debug(response);
                    ResponseModel<DataLakeCallResponse<JsonNode>> variable = null;
                    try {
                        variable = JsonUtil.fromString(response, new TypeReference<ResponseModel<DataLakeCallResponse<JsonNode>>>() {
                        });
                        log.debug(response);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    variable.getPayload().getList().forEach(
                            object-> subscriptionIds.add(object.get("subscriptionId").asLong()));
    
                    if(variable.getPayload().getPagination().getResultCount() > 1000){
                            String bodyI = "{\n" +
                                    " \"productCodes\": [\"" + productCode.name() + "\"],\n" +
                                    " \"pagination\": {\n" +
                                    "     \"offset\": " + i.incrementAndGet() * 1000 + ",\n" +
                                    "     \"limit\": 1000\n" +
                                    "  }\n" +
                                    " }";
                            return restClient.post(
                                            "https://" + url,
                                            buildRequiredHeaders(),
                                            bodyI,
                                            String.class
                                    )
                                    .onErrorMap(err-> new RuntimeException(err.getMessage()))
                                    .flatMap(resp -> {
                                        return restClient.post(
                                                        "https://" + url,
                                                        buildRequiredHeaders(),
                                                        "{\n" +
                                                                " \"productCodes\": [\"" + productCode.name() + "\"],\n" +
                                                                " \"pagination\": {\n" +
                                                                "     \"offset\": " + i.incrementAndGet() * 1000 + ",\n" +
                                                                "     \"limit\": 1000\n" +
                                                                "  }\n" +
                                                                " }",
                                                        String.class
                                                )
                                                .onErrorMap(err-> new RuntimeException(err.getMessage()))
                                                .flatMap(respI -> {
                                                    ResponseModel<DataLakeCallResponse<JsonNode>> varia = null;
                                                    try {
                                                        varia = JsonUtil.fromString(respI, new TypeReference<ResponseModel<DataLakeCallResponse<JsonNode>>>() {
                                                        });
                                                    } catch (IOException e) {
                                                        throw new RuntimeException(e);
                                                    }
                                                    varia.getPayload().getList().forEach(
                                                            object-> subscriptionIds.add(object.get("subscriptionId").asLong()));
    
                                                    return Mono.just(subscriptionIds);
                                                });
                                    }).repeat(variable.getPayload().getPagination().getResultCount() / 1000);
                    }
                    return Mono.just(subscriptionIds);
                });
    }
    

    Basically I changed the first flatMap to flatMapMany so that inside that I could have a flatMap with a repeat loop. I had to return a Flux instead of my original Mono<List> but since I know it will always result in a Mono<List> anyway I changed original caller to

    return getAllSubscriptionIds(request.getEventMetadata().getProductCode()).collect(Collectors.reducing((i1, i2) -> i1)).flatMap(responseIds -> {
            List<BillableApiCall> queryResults = dataLakeMapper.getBillableCallsApiCheckIban(
                    ((ArrayList<Long>)responseIds.get()),
                    DateUtil.toLocalDateEuropeRome(request.getFromDate()),
                    DateUtil.toLocalDateEuropeRome(request.getToDate()),
                    request.getPagination()
            );
    

    So I had to add .collect(Collectors.reducing((i1, i2) -> i1)) (I copy/pasted this so I only guess what it does... it convert the Flux to Mono), and cast my responseIds with ((ArrayList)responseIds.get()).

    repeat was not the final solution since it only repeat what's inside the flatMap (it will not repeat the post connected to it) so I had to use a trick... I removed the for loop that was not necessary, I made a post inside my flatMap repeat with another flatMap... The only missing things was to keep track of my index and I was able to find that you can use an AtomicInteger to do that. It was not an easy task at all but I tested it and it's working. To recap:

    1. flatMapMany with a repeat flatMap inside (repeat only take a long as an argument, so it will repeat until it reach that value and auto-increment... but you cannot use this index for what I understand).
    2. Another flatMap inside the flatMap repeat, that's because you cannot do another post call without this workaround as repeat will only repeat what's inside of the flatMap (not the post call before that but it can do the post call inside it).
    3. An AtomicInteger as your index.
    4. Change the return type to Flux, collect and cast.

    Hope someone will benefit from my headache.