We started upgrading rxjs
from v5 to v6 (without using the compatibility layer, in case it makes any difference).
The majority of the changes are straight forward and well documented in https://github.com/ReactiveX/rxjs/blob/6.x/docs_app/content/guide/v6/migration.md. However, we are not sure at all and couldn't find any good documentation about the v5 connect
operator.
We have the following snipper of code:
const syncStatesObservable = Observable.from(intervals)
.concatMap(i =>
fetchSyncState(i).concatMap(syncState => {
if (syncState.version !== version) {
logger.info(`Regenerate all entries: version ${version}`);
return clearAndFetchSyncState(i);
}
return Observable.of(syncState);
})
)
.publishReplay();
syncStatesObservable.connect();
// syncStatesObservable is then used in other streams
The equivalent v6 snippet (apart from the connect
operator) looks like the following:
const syncStatesObservable = from(intervals).pipe(
concatMap(i =>
fetchSyncState(i).pipe(
concatMap(syncState => {
if (syncState.version !== version) {
logger.info(`Regenerate all entries: version ${version}`);
return clearAndFetchSyncState(i);
}
return of(syncState);
})
)
),
publishReplay()
);
syncStatesObservable.connect(); // What to do here???
As per TypeScript, Property 'connect' does not exist on type 'Observable<any>'.
What would be the equivalent for the connect
operator in rxjs@6
?
It seems like this is purely a TypeScript issue. We were able to fix it by typecasting the observable as follows:
const syncStatesObservable = from(intervals).pipe(
concatMap(i =>
fetchSyncState(i).pipe(
concatMap(syncState => {
if (syncState.version !== version) {
logger.info(`Regenerate all entries: version ${version}`);
return clearAndFetchSyncState(i);
}
return of(syncState);
})
)
),
publishReplay()
) as ConnectableObservable<SyncState>;
syncStatesObservable.connect();