springspring-webfluxreactive-programmingbackpressure

How to strip objects inside an array in Spring Webflux in order to get each object through an event


What I want to achieve is to stream the objects under the JSON property "ranges" through flux events in order to apply a backpreasure mechanism.

I have a Json string which looks like this:

{
    "name": "the name",
    "id": "34343",
    "ranges": [
        {
            "range": "1 to 1000",
            "value": "11",
        },
        {
            "range": "1000-2000",
            "value": "12",
        },

I also have the java mapping in a class:

public class WholeJson {

    @JsonProperty("name")
    String name;

    @JsonProperty("id")
    String id;

    @JsonProperty("ranges")
    List<Range> ranges;
...
}

public class Range {
    private final String range;
    private final String value;

    public IpRange(@JsonProperty(value = "range") String range,
                   @JsonProperty(value = "value") String value) {

        this.range= range;
        this.value= value;
    }
...
...
}

How can I "strip off" the array and get only the list of Json Objects through the webclient, so that I can process the elements inside the array through more than just one event?

I used the webclient to return a list of Range object, which looked plausible to me.

...
...
    Mono<List<Range>> response = client.get()
                   .retrieve()
                   .bodyToMono(WholeJson.class)
                   .map(WholeJson::getRanges);

     Flux<List<Range>> secondresponse = Flux.from(response);

     secondresponse.subscribe(
                data -> onNext(data), // onNext
                err -> onError(err),  // onError
                () -> onComplete() // onComplete
        );
    }

    private static <T> void onNext(T data) {
        System.out.println("onNext: Data received: " + data);
    }

    private static <T> void onError(Throwable err) {
        System.out.println("onError: Exception occurred: " + err.getMessage());
    }

    private static <T> void onComplete() {
        System.out.println("onComplete: Completed!");
    }

But the output in the console shows that only one element is returned, that is the array (so the array is in fact a mono representing 1 element)

onNext: Data received: [range: 1-1000
value: 11
range: 1000-2000
value: 12]

What I expected was, that I'll receive an onNext-Event for each of the objects. What can I do to get the objects inside the array processed by events?

Thank you very much in advance for your help.


Solution

  • You create a flux from a single object which happen to be a List instance but Flux doesn't know that you want to process the elements of it.

    You have to convert to a Flux of its items like that:

    Flux<Range> secondresponse = Flux.from(response)
        .flatMap(Flux::fromIterable);
    

    Or in other way:

    Mono<WholeJson> response = client.get()
                   .retrieve()
                   .bodyToMono(WholeJson.class);
    
    Flux<Range> secondresponse = Flux.from(response)
        .flatMap(json -> Flux.fromIterable(json.getRanges));