Using the Java Flow API. I have a chain of Processor
s and a terminal Subscriber
.
I have coded up Subscriber
s that have kept a count of items received that work fine with a SubmissionPublisher
.
However, when I pass values on from a Processor
(which extends SubmissionPublisher
) to the Subscriber
below, no output is printed to the console.
If I remove minValue.get()
the text is printed to the console.
Why does the calling class variable cause onComplete
not to execute as expected?
public class MinValue implements Subscriber<Integer> {
private Flow.Subscription subscription;
private AtomicInteger minValue=new AtomicInteger(Integer.MAX_VALUE);
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Integer item) {
if(minValue.get()>item){
System.out.println("Min Value : "+item);
minValue.set(item);
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error + throwable.getMessage() \nMin Value is : "+minValue.get());
}
@Override
public void onComplete() {
System.out.println("Successful Completion - Min Value is : "+minValue.get());
}
}
Edit - adding Minimal Reproduceable Example
public class NewClass {
public static void main(String args[]) {
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
Subscriber<Integer> subscriber = new MinValue();
publisher.subscribe(subscriber);
publisher.submit(10);
publisher.submit(11);
publisher.submit(9);
publisher.submit(12);
publisher.submit(8);
publisher.close();
}
}
The issue was that the main thread was exiting before the subscribers onComplete() was called.
If I add a sleep after calling publisher.close() the subscriber.onComplete() gets time to execute on its own thread.