swiftconcurrencyrx-swiftdefensive-programming

How to defensively create Rx Observables and avoid race conditions?


I was hunting a weird edge case where a list of files in a directory didn't show results for 0...2 files but worked fine for 3...n files.

It turned out that the original observable sequence worked just fine. But I used a PublishSubject in one subscriber to relay the effect of the change. All of this reportedly happened on the main queue, but it seems the PublishSubject got fed values before it had a subscriber. (Since there's no replay, the subscriber wouldn't know.)

So the set-up of all components (origin -- relaying subscriber -- consuming subscriber) seems to have introduced time as a problem.

Weird observations:

Now I don't understand how to handle issues like these defensively.

It seems the PublishSubject might not be a good fit for the situation. But why does observing-on the main queue improve the situation?

When should you (defensively) specify observable sequences to run on the main queue upon creation/production? (Again, this might be a pragmatic fix, but it only accidentally seems to solve the problem.)

Or, put differently, when should you assume things don't happen in time in the consumer's code, i.e. when the subscription is set up?

There was no way for me to tell that relaying events from input sequence to PublishSubject caused trouble. It's not perceivable. Leaves me puzzled how to avoid bugs like this.


Solution

  • You need to understand the difference of observeOn and subscribeOn and the properties of Driver sequence.

    observeOn affects the observation blocks i.e. where you handle the emitted values e.g.

    someObservable
    .subscribe(onNext: { /* this is an example of observation block */ })
    

    subscribeOn affects the scheduler in the subscription block i.e. where the values are produced e.g.

    Observable<Int>.create { /* this is an example of subscription block */ }
    

    Driver

    Driver is a special type of observable sequence with some properties.

    Considering the last two you can do your own driver from observable:

    someObservable
    .observeOn(MainScheduler.instance)
    .share(replay: 1, scope: .whileConnected)
    

    and this is what you probably want to do by your question.