postgresqlspring-webfluxreactive-streams

While fetching 4000 or more values from a database (postgresql) it is not possible to filter the elements. (using Reactive streams)


It is necessary to filter the flow of values from the database (filtering must be performed in a reactive, non-blocking style). You need to get the incoming element, take the email field from it and validate this value. After the first correct result. Stop processing.

However. to get a valid value, you may have to process several thousand elements before you get the first suitable value.

public class User {

    @Id
    Long id;

    String email;
    
    String phoneHome;
}

there is a search by phone number

private Mono<Response> findUserByHomePhone(Response response) {

    return Mono.just(response)
            .flatMap(this::retrieveUserList);
}
private Mono<Response> retrieveUserList (Response response) {

    String phone = retrievePhoneFromResponse (response);
  
    return Mono.from(userService.getByPhone (phone)
                    .groupBy(Customer::getId)
                    .flatMap(this::processGroupedObjects)
                    .switchOnFirst((signal, flux) -> getFirsFoundElement(signal, response)))
            .as(Log.of(log,
                    "Search by phone {}",
                    phone (response))::info);
}
select * from users by phone = $1
private Mono< Response > getFirsFoundElement(Signal<? extends User> signal, Response response) {

    boolean isFoundElement = signal.hasValue();

    if (isFoundElement) {
        return Mono.just(response);
    }

    return Mono.error(new Exception());
}


private Flux<User> processGroupedObjects(GroupedFlux<Long, User> group) {

    return group
            .mapNotNull(this::checkEmailOnNull);
}

private User checkEmailOnNull (User user) {

    String email = user.getEmail();

    if (Strings.isEmpty(email)){
        return null;
    }

    return user;
}

However, approximately at this point (process Group Objects(GroupedFlux<Lang, User> group)) - a freezes occurs, I do not observe errors in the console.

The number of source elements in the database is from 4 000 to 25 000 (the algorithm worked for 3000).

I found this: GroupedFlux

Grouping is best suited for when you have a medium to low number of groups. The groups must also imperatively be consumed (such as by a flatMap) so that groupBy continues fetching data from upstream and feeding more groups. Sometimes, these two constraints multiply and lead to hangs, such as when you have a high cardinality and the concurrency of the flatMap consuming the groups is too low.

however, it is not clear to me what was meant in the documentation, although I think my case is described here, but I am not sure about it.

Мaybe anyone has any ideas why freezes occur, how it could be fixed?


Solution

  • As you found out, grouping doesn't work for a flow with many groups. So I suggest avoiding the groups at all.

    As your goal is just to find any user with an email it's simple:

    userService.getByPhone (phone)
      .filter((user) -> user.getEmail() != null)
      .next()
    

    If you want to get emails for all users you don't need a groupBy too, just use distinct:

    userService.getByPhone (phone)
      .filter((user) -> user.getEmail() != null)
      .distinct((user) -> user.getId())