javarx-javarx-java3

RxJava subscribe only for specific key


I'm receiving an infinite stream of events, lets say Observable<Event> where Event(userId, payload). I have to provide functionality which allows subscribing users for their events. Observable<Event> is a hot source, but it doesn't matter as I can drop events if there are no interested users on it. I'm trying to do it in pure streaming fashion using RxJava API but it seems difficult for me. What did I try:

  1. Do it with the least effort, using built-in operators.
   Observable<Event> events =; //;
   events = events.publish().refCount();
        
   Observable<Event> subscribeUser(int userId) {
       return events.filter(it -> it.userId == userId);
   }

Works great, the problem is its just slow for my case - I don't need multicast to all subscribers and then filter if it not belongs to any user. Imagine there is tens of thousands users and 1000 events per second - O(n) complexity is too much for me - I need some hashing i think.

  1. PublishSubject
 ConcurrentHashMap<Integer, PublishSubject<Event>> subscriptions = new ConcurrentHashMap<>();
 Observable<Event> events =; //
 events.subscribe(event -> /*pushing on user subject*/);

 Observable<Event> subscribeUser(int userId) {
    return subscriptions.putIfAbsent(userId, PublishSubject.create())
   .doOnTerminate(() -> subscriptions.remove(userId))
 }

This requires more coding (much more than in example above) for managing subscriptions, it's not pure streaming also PublishSubject is designed for multiple subscribers so it takes a little more space for internal arrays which I don't need, because I have only exactly one user per subject.

  1. groupBy() operator I can group events by userId producing GroupedObservable streams, then I can just .ignoreElements() until there is no subscription but I don't know how can I connect these two together. On one side I have hot Observable stream of GroupedObservable<Integer, Event> and then incoming Observable<Event> subscribeUser(int userId) request. How can I say: "hey, from now don't drop elements because there is someone interested on it" and return GroupedObservable as response for request?

  2. custom Observable

Problem here is Observable.create(subscriber -> ) subscriber doesn't have information about its key (userId) - I also have no control over subscribe() method so I cannot pass there my custom implementation and retrieve key after casting or somehow. It's also bad because I need to manage state by myself.

  1. lift() with custom operator Similar problems to 4, even worse because I have no control over creating Observable here so I need to also prevent multiple connections to source

Solution

  • Option 2) is the most straightforward. Option 4 would require more cleverness in coding an Observable.

    ConcurrentMap<Integer, Observer<Event>> observers = ...
    
    void signal(Event event) {
       var observer = observers.get(event.userId);
       if (observer != null) {
           observer.onNext(event);
       }
    }
    
    Observable<Event> observeUser(int userId) {
        return Observable.unsafeCreate(observer -> {
            var remove = Disposable.fromAction(() -> observers.remove(userId));
            subscriber.onSubscribe(remove);
            observers.put(userId, observer);
        });
    }