I have just started with functional programming in java and facing some difficulty. I'm writing a method to establish a reactive session with database and return flux object to the caller. Caller will then subscribe to this flux and fetch results accordingly. Trying to mimic this example here
What I have is
return Flux.usingWhen(
Mono.just(getDataStore().getRxSession()),
session -> Flux.from(session.run(query).records()),
RxSession::close);
and then a different function subscribing to this flux
Flux<Record> rflux = query.sub();
rflux.takeUntil(//Implement wait and notifier here).subscribe(//notify here);
Found my answer here : https://www.baeldung.com/reactor-core
I can use Subscription interface methods to request next set of records.
Flux<Record> rflux = query.sub();
rflux.subscribe(new Subscriber<Record>() {
private Subscription s;
@Override
public void onSubscribe(Subscription sub) {
this.s = sub;
s.request(reqParam);
}
@Override
public void onNext(Record record) {
System.out.println("The record that is pulled is " + record.toString());
s.request(reqParam);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
});