rxjsobservablereactive-programming

RxJS: How would I "manually" update an Observable?


I think I must be misunderstanding something fundamental, because in my mind this should be the most basic case for an observable, but for the life of my I can't figure out how to do it from the docs.

Basically, I want to be able to do this:

// create a dummy observable, which I would update manually
var eventObservable = rx.Observable.create(function(observer){});
var observer = eventObservable.subscribe(
   function(x){
     console.log('next: ' + x);
   }
...
var my_function = function(){
  eventObservable.push('foo'); 
  //'push' adds an event to the datastream, the observer gets it and prints 
  // next: foo
}

But I have not been able to find a method like push. I'm using this for a click handler, and I know they have Observable.fromEvent for that, but I'm trying to use it with React and I'd rather be able to simply update the datastream in a callback, instead of using a completely different event handling system. So basically I want this:

$( "#target" ).click(function(e) {
  eventObservable.push(e.target.text()); 
});

The closest I got was using observer.onNext('foo'), but that didn't seem to actually work and that's called on the observer, which doesn't seem right. The observer should be the thing reacting to the data stream, not changing it, right?

Do I just not understand the observer/observable relationship?


Solution

  • In RX, Observer and Observable are distinct entities. An observer subscribes to an Observable. An Observable emits items to its observers by calling the observers' methods. If you need to call the observer methods outside the scope of Observable.create() you can use a Subject, which is a proxy that acts as an observer and Observable at the same time.

    You can do like this:

    var eventStream = new Rx.Subject();
    
    var subscription = eventStream.subscribe(
       function (x) {
            console.log('Next: ' + x);
        },
        function (err) {
            console.log('Error: ' + err);
        },
        function () {
            console.log('Completed');
        });
    
    var my_function = function() {
      eventStream.next('foo'); 
    }
    

    You can find more information about subjects here: