My problem is that I do a post request to get the total number of elements in my db and I need to do a for loop until I reach that number integer division 10.
My current not working code
protected Mono<List<Long>> getAllSubscriptionIds(ProductCode productCode) {
List<Long> subscriptionIds = new ArrayList<>();
String body = "{\n" +
" \"productCodes\": [\"" + productCode.name() + "\"],\n" +
" \"pagination\": {\n" +
" \"offset\": 0,\n" +
" \"limit\": 10" +
"\n }\n" +
" }";
//first post where I get the number of elements in my db
return restClient.post(
"https://" + url,
buildRequiredHeaders(),
body,
String.class
)
.onErrorMap(err-> new RuntimeException(err.getMessage()))
.flatMap(response -> {
log.debug(response);
ResponseModel<DataLakeCallResponse<JsonNode>> variable = null;
try {
variable = JsonUtil.fromString(response, new TypeReference<ResponseModel<DataLakeCallResponse<JsonNode>>>() {
});
log.debug(response);
} catch (IOException e) {
throw new RuntimeException(e);
}
variable.getPayload().getList().forEach(
object-> subscriptionIds.add(object.get("subscriptionId").asLong()));
//if number of elements > 10
if(variable.getPayload().getPagination().getResultCount() > 10){
//for loop on that number / 10 (so that I can use an offset
for (int i = 0; i < variable.getPayload().getPagination().getResultCount() / 10; i++){
String bodyI = "{\n" +
" \"productCodes\": [\"" + productCode.name() + "\"],\n" +
" \"pagination\": {\n" +
" \"offset\": " + (i + 1) * 10 + ",\n" +
" \"limit\": 10\n" +
" }\n" +
" }";
return restClient.post(
"https://" + url,
buildRequiredHeaders(),
bodyI,
String.class
)
.onErrorMap(err-> new RuntimeException(err.getMessage()))
.flatMap(resp -> {
ResponseModel<DataLakeCallResponse<JsonNode>> varia = null;
try {
varia = JsonUtil.fromString(resp, new TypeReference<ResponseModel<DataLakeCallResponse<JsonNode>>>() {
});
} catch (IOException e) {
throw new RuntimeException(e);
}
varia.getPayload().getList().forEach(
object-> subscriptionIds.add(object.get("subscriptionId").asLong()));
return Mono.just(subscriptionIds);
});
}
}
return Mono.just(subscriptionIds);
});
}
I do understand why this does not work (it return inside the for loop) but I don't really understand what alternative can I use to make it work. I tried an external method but it will still fail. I tried a Mono.zip but I think I tried it wrong.
This is an alternative that I tried but it still does not work.
protected Mono<Object> getAllSubscriptionIds(ProductCode productCode) {
this.counter = 0;
List<Long> subscriptionIds = new ArrayList<>();
List<Mono<Integer>> resultList = new ArrayList<>();
String body = "{\n" +
" \"productCodes\": [\"" + productCode.name() + "\"],\n" +
" \"pagination\": {\n" +
" \"offset\": 0,\n" +
" \"limit\": 10" +
"\n }\n" +
" }";
return restClient.post(
"https://" + url,
buildRequiredHeaders(),
body,
String.class
)
.onErrorMap(err-> new RuntimeException(err.getMessage()))
.flatMap(response -> {
log.debug(response);
ResponseModel<DataLakeCallResponse<JsonNode>> variable = null;
try {
variable = JsonUtil.fromString(response, new TypeReference<ResponseModel<DataLakeCallResponse<JsonNode>>>() {
});
log.debug(response);
} catch (IOException e) {
throw new RuntimeException(e);
}
variable.getPayload().getList().forEach(
object-> subscriptionIds.add(object.get("subscriptionId").asLong()));
if(variable.getPayload().getPagination().getResultCount() > 10){
for (int i = 0; i < variable.getPayload().getPagination().getResultCount() / 10; i++){
resultList.add(Mono.just(i));
}
}
return Mono.zip(resultList, intMono -> {
this.counter++;
String bodyI = "{\n" +
" \"productCodes\": [\"" + productCode.name() + "\"],\n" +
" \"pagination\": {\n" +
" \"offset\": " + this.counter * 10 + ",\n" +
" \"limit\": 10\n" +
" }\n" +
" }";
return restClient.post(
"https://" + url,
buildRequiredHeaders(),
bodyI,
String.class
)
.onErrorMap(err-> new RuntimeException(err.getMessage()))
.flatMap(resp -> {
ResponseModel<DataLakeCallResponse<JsonNode>> varia = null;
try {
varia = JsonUtil.fromString(resp, new TypeReference<ResponseModel<DataLakeCallResponse<JsonNode>>>() {
});
} catch (IOException e) {
throw new RuntimeException(e);
}
varia.getPayload().getList().forEach(
object-> subscriptionIds.add(object.get("subscriptionId").asLong()));
return Mono.just(subscriptionIds);
});
});
// return Mono.just(subscriptionIds);
});
}
Any idea how to solve this?
Ok finally I got a solution
protected Flux<Object> getAllSubscriptionIds(ProductCode productCode) {
List<Long> subscriptionIds = new ArrayList<>();
AtomicInteger i = new AtomicInteger();
String body = "{\n" +
" \"productCodes\": [\"" + productCode.name() + "\"],\n" +
" \"pagination\": {\n" +
" \"offset\": 0,\n" +
" \"limit\": 1000" +
"\n }\n" +
" }";
return restClient.post(
"https://" + url,
buildRequiredHeaders(),
body,
String.class
)
.onErrorMap(err-> new RuntimeException(err.getMessage()))
.flatMapMany(response -> {
log.debug(response);
ResponseModel<DataLakeCallResponse<JsonNode>> variable = null;
try {
variable = JsonUtil.fromString(response, new TypeReference<ResponseModel<DataLakeCallResponse<JsonNode>>>() {
});
log.debug(response);
} catch (IOException e) {
throw new RuntimeException(e);
}
variable.getPayload().getList().forEach(
object-> subscriptionIds.add(object.get("subscriptionId").asLong()));
if(variable.getPayload().getPagination().getResultCount() > 1000){
String bodyI = "{\n" +
" \"productCodes\": [\"" + productCode.name() + "\"],\n" +
" \"pagination\": {\n" +
" \"offset\": " + i.incrementAndGet() * 1000 + ",\n" +
" \"limit\": 1000\n" +
" }\n" +
" }";
return restClient.post(
"https://" + url,
buildRequiredHeaders(),
bodyI,
String.class
)
.onErrorMap(err-> new RuntimeException(err.getMessage()))
.flatMap(resp -> {
return restClient.post(
"https://" + url,
buildRequiredHeaders(),
"{\n" +
" \"productCodes\": [\"" + productCode.name() + "\"],\n" +
" \"pagination\": {\n" +
" \"offset\": " + i.incrementAndGet() * 1000 + ",\n" +
" \"limit\": 1000\n" +
" }\n" +
" }",
String.class
)
.onErrorMap(err-> new RuntimeException(err.getMessage()))
.flatMap(respI -> {
ResponseModel<DataLakeCallResponse<JsonNode>> varia = null;
try {
varia = JsonUtil.fromString(respI, new TypeReference<ResponseModel<DataLakeCallResponse<JsonNode>>>() {
});
} catch (IOException e) {
throw new RuntimeException(e);
}
varia.getPayload().getList().forEach(
object-> subscriptionIds.add(object.get("subscriptionId").asLong()));
return Mono.just(subscriptionIds);
});
}).repeat(variable.getPayload().getPagination().getResultCount() / 1000);
}
return Mono.just(subscriptionIds);
});
}
Basically I changed the first flatMap to flatMapMany so that inside that I could have a flatMap with a repeat loop. I had to return a Flux instead of my original Mono<List> but since I know it will always result in a Mono<List> anyway I changed original caller to
return getAllSubscriptionIds(request.getEventMetadata().getProductCode()).collect(Collectors.reducing((i1, i2) -> i1)).flatMap(responseIds -> {
List<BillableApiCall> queryResults = dataLakeMapper.getBillableCallsApiCheckIban(
((ArrayList<Long>)responseIds.get()),
DateUtil.toLocalDateEuropeRome(request.getFromDate()),
DateUtil.toLocalDateEuropeRome(request.getToDate()),
request.getPagination()
);
So I had to add .collect(Collectors.reducing((i1, i2) -> i1)) (I copy/pasted this so I only guess what it does... it convert the Flux to Mono), and cast my responseIds with ((ArrayList)responseIds.get()).
repeat was not the final solution since it only repeat what's inside the flatMap (it will not repeat the post connected to it) so I had to use a trick... I removed the for loop that was not necessary, I made a post inside my flatMap repeat with another flatMap... The only missing things was to keep track of my index and I was able to find that you can use an AtomicInteger to do that. It was not an easy task at all but I tested it and it's working. To recap:
Hope someone will benefit from my headache.