typescriptrxjsrxjs5rxjs6rxjs-observables

rxjs 5 -> 6 migration: connect operator missing


We started upgrading rxjs from v5 to v6 (without using the compatibility layer, in case it makes any difference).

The majority of the changes are straight forward and well documented in https://github.com/ReactiveX/rxjs/blob/6.x/docs_app/content/guide/v6/migration.md. However, we are not sure at all and couldn't find any good documentation about the v5 connect operator.

We have the following snipper of code:

const syncStatesObservable = Observable.from(intervals)
    .concatMap(i =>
        fetchSyncState(i).concatMap(syncState => {
            if (syncState.version !== version) {
                logger.info(`Regenerate all entries: version ${version}`);
                return clearAndFetchSyncState(i);
            }
            return Observable.of(syncState);
        })
    )
    .publishReplay();

syncStatesObservable.connect();

// syncStatesObservable is then used in other streams

The equivalent v6 snippet (apart from the connect operator) looks like the following:

const syncStatesObservable = from(intervals).pipe(
    concatMap(i =>
        fetchSyncState(i).pipe(
            concatMap(syncState => {
                if (syncState.version !== version) {
                    logger.info(`Regenerate all entries: version ${version}`);
                    return clearAndFetchSyncState(i);
                }
                return of(syncState);
            })
        )
    ),
    publishReplay()
);

syncStatesObservable.connect(); // What to do here???

As per TypeScript, Property 'connect' does not exist on type 'Observable<any>'.

What would be the equivalent for the connect operator in rxjs@6?


Solution

  • It seems like this is purely a TypeScript issue. We were able to fix it by typecasting the observable as follows:

    const syncStatesObservable = from(intervals).pipe(
        concatMap(i =>
            fetchSyncState(i).pipe(
                concatMap(syncState => {
                    if (syncState.version !== version) {
                        logger.info(`Regenerate all entries: version ${version}`);
                        return clearAndFetchSyncState(i);
                    }
                    return of(syncState);
                })
            )
        ),
        publishReplay()
    ) as ConnectableObservable<SyncState>;
    
    syncStatesObservable.connect();
    

    Source: https://github.com/ReactiveX/rxjs/issues/2972