Is there really no official project reactor upgrade guide from v2 to v3?
I've searched far and wide, even using waybackmachine, but I've had no luck finding any upgrade path documentation from v2
to v3
(v2.0.6.RELEASE
to v3.5.15
, to be more precise).
Now, I'm aware that I'm coming in a bit late with this question, given that the first v3
came out somewhere in 2017, but the current version is still v3, so I guess it shouldn't be too late, right?
The best resource I've found so far is this GH issue comment, but I have several other questions.
Specifically, what do I use in v3
instead of these v2
classes:
reactor.core.Dispatcher
reactor.core.dispatch.SynchronousDispatcher
reactor.core.dispatch.RingBufferDispatcher
reactor.core.processor.RingBufferProcessor
(the comment I linked to above says reactor.core.publisher.TopicProcessor
, but that too has since been removed)reactor.rx.action.Action
reactor.rx.stream.GroupedStream
(I assume reactor.core.publisher.GroupedFlux
, since Stream
-> Flux
, but what do I use instead of GroupedStream.lift
?Thanks.
OK, let me answer each of the items from my question with a separate answer (for readability reasons) in the hope that it might help others in the future.
Dispatcher
-> Scheduler
SynchronousDispatcher
-> Schedulers.immediate()
Old:
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.Predicate;
import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;
public final class ReactiveEventRouter {
// dropping unimportant, unchanged parts and field modifiers for brevity
SynchronousDispatcher syncDispatcher = new SynchronousDispatcher();
CopyOnWriteArrayList<Subscription<?>> subscriptions = new CopyOnWriteArrayList<>();
reactor.fn.Consumer<Throwable> errorConsumer = (th) -> log.error("Error while processing event", th);
public <E> void tryRoute(final Object event, Subscription<E>
subscription, Dispatcher dispatcher) {
if (subscription.test(event)) {
dispatcher.dispatch(event, subscription::accept, errorConsumer);
}
}
public <E> void tryRoute(final Object event, Subscription<E> subscription) {
tryRoute(event, subscription, syncDispatcher);
}
public void route(final Object event) {
route(event, syncDispatcher);
}
public void route(final Object event, Dispatcher dispatcher) {
subscriptions.forEach(subscription -> {
if (subscription.test(event)) {
dispatcher.dispatch(event, subscription::accept, errorConsumer);
}
});
}
}
New:
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.Predicate;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public final class ReactiveEventRouter {
Scheduler immediateScheduler = Schedulers.immediate();
CopyOnWriteArrayList<Subscription<?>> subscriptions = new CopyOnWriteArrayList<>();
Consumer<Throwable> errorConsumer = (th) -> log.error("Error while processing event", th);
public <E> void tryRoute(final Object event, Subscription<E> subscription, Scheduler scheduler) {
if (subscription.test(event)) {
Mono.just(event)
.subscribeOn(scheduler)
.doOnError(errorConsumer)
.doOnNext(subscription::accept)
.subscribe();
}
}
public <E> void tryRoute(final Object event, Subscription<E> subscription) {
tryRoute(event, subscription, immediateScheduler);
}
public void route(final Object event) {
route(event, immediateScheduler);
}
public void route(final Object event, Scheduler scheduler) {
subscriptions.forEach(subscription -> {
if (subscription.test(event)) {
Mono.just(event)
.subscribeOn(scheduler)
.doOnError(errorConsumer)
.doOnNext(subscription::accept)
.subscribe();
}
});
}
}
JFYI: a simple inner class ReactiveEventRouter.Subscription
hasn't changed, but I'm including it here to avoid confusion with org.reactivestreams.Subscription
:
public static class Subscription<E> {
final Predicate<Object> matcher;
final Consumer<E> consumer; // this is java.util.function.Consumer
Subscription(Predicate<Object> matcher, Consumer<E> consumer) {
this.matcher = matcher;
this.consumer = consumer;
}
boolean test(Object obj) {
return matcher.test(obj);
}
void accept(Object evt) {
consumer.accept((E) evt);
}
}