spring-bootrecursionspring-webfluxreactive-programmingproject-reactor

Reactive Spring Boot call an API recursively based on response parameter of same API call


I have an external API which returns a List of Student along with count and offset parameters which represents number of students remaining in the database.

It is similar to a paginated response but it does not send Pageable information in the response.

The response is in the below format.

{
"students": 
[
    {
     "id":1, 
     "name":"Adam"
     }, 

    {
     "id":2, 
      "name":"Alan"
     }
], 

"count":2,

"offset":10

}

Question:

I need to write a recursive function in Spring Reactive which calls this API and accumulates all Students Flux<Student> and send it to the front end.

Limitations:

The maximum number of Students I can fetch per call is 2. There could be about 20 Students in the Database.

Rough algorithm:

do(getStudents(offset))
while(response.count <2);

I want to be able to perform this operation using Reactive Spring.

I am guessing I could use something like Flux.generate.takeUntil etc.

However I am not sure of the right Syntax as takeUntil accepts final value and not a variable.

Please suggest a way to achieve the above functionality in Reactive Spring

Any help is appreciated. Thanks in advance.


Solution

  • For recursion kind of use-cases, Reactor offers Mono.expand (and other variants like Flux.expand or Flux.expandDeep).

    The expand operator is like flatMap, except that is it also re-applied on elements it produces (therefore, making a recursion effect).

    For your use case, a pseudo-code could be:

    int count = 2;
    Mono<Response> firstPage = service.getFirstPage(count);
    Flux<Response> allPages = firstPage.expand(response -> {
      if (response.moreAvailable) return service.nextPage(response.offset, count);
      else return Mono.empty();
    });
    

    Here is a complete minimal reproducible example that mock the service returning pages with in-memory records:

    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    import java.util.Collections;
    import java.util.List;
    
    import static java.lang.Math.min;
    
    public class ExpandPages {
    
        // Data models
        record Student(int id, String name) {}
        record Page(int remaining, int nextOffset, List<Student> students) {}
    
        /**
         * Mock a service that sends pages of students
         * @param all All available students
         */
        record StudentRegistry(List<Student> all) {
            StudentRegistry {
                if (all == null || all.isEmpty()) throw new IllegalArgumentException("Null or empty student list");
                all = Collections.unmodifiableList(all);
            }
    
            /**
             * Request a page of students.
             *
             * @return A single page of students, starting at provided offset, with a maximum count of provided count.
             */
            public Mono<Page> next(int offset, int count) {
                if (offset < 0 || offset >= all.size()) throw new IllegalArgumentException("Bad offset");
                if (count < 1) throw new IllegalArgumentException("Bad count");
    
                count = min(count, all.size() - offset);
                int nextOffset = offset + count;
                int remaining = all.size() - nextOffset;
                return Mono.just(new Page(remaining, nextOffset, all.subList(offset, offset + count)));
            }
        }
    
        public static void main(String[] args) {
            final var registry = new StudentRegistry(List.of(
                    new Student(1, "John"),
                    new Student(2, "Jane"),
                    new Student(3, "Jack"),
                    new Student(4, "Jules"),
                    new Student(5, "Julie"),
                    new Student(6, "James"),
                    new Student(7, "Joe"),
                    new Student(8, "Johanna"),
                    new Student(9, "Jolly Jumper")
            ));
    
            final int queriedCount = 2;
            Flux<Page> pages = registry
                    // Get first page
                    .next(0, queriedCount)
                    // Recurse on each received page: check if there's more, then ask for the next available page
                    .expand(response -> {
                        System.out.println("Received page "+response);
                        if (response.remaining() <= 0) {
                            System.out.println("No more page to fetch.");
                            return Mono.empty(); // Ends recursion
                        } else {
                            return registry.next(response.nextOffset(), min(queriedCount, response.remaining()));
                        }
                    });
    
            // Trigger flow consumption: print all received students
            pages.flatMapIterable(Page::students)
                 .doOnNext(System.out::println)
                 .blockLast();
        }
    }