In rxjs-land, you can merge multiple Observables with the merge
operator. If you have an array of Observables, you simply spread that array into the merge operator, like so: merge(...arrayOfObservables)
. And if you have a 'higher-order' Observable (an Observable that emits Observables), you can use mergeAll
.
I'm looking to do something similar to mergeAll
with Angular's new Signals. I have a Signal which holds an array of Signals, all_the_states = signal([] as Signal<any>[])
. all_the_states
gets added to (and removed from) over time. That works easily enough via mutate
. Now I want to create a simple Signal latest_state
that always holds the latest value of any of the 'inner' Signals.
I thought I could simply "take a trip to rxjs-land, merge, and go back", i.e. do the following:
latest_state = toSignal(
merge(...this.all_the_states().map((single) => toObservable(single))),
{ initialValue: null }
)
However, when I add something to all_the_states
, latest_state
just stays null.
I've also tried
higher_order_observable: Observable<Observable<any>[]> =
toObservable(this.selection_streams).pipe(
map(signals => signals.map(signal => toObservable(signal)))
)
single_observable: Observable<any> =
this.higher_order_observable.pipe(mergeAll(), mergeAll())
latest_state = toSignal(this.single_observable, { initialValue: null })
which works as far as the types are concerned - though I'm not sure about the double mergeAll
- but in that case Angular gives me the error NG0203: toObservable() can only be used within an injection context such as a constructor, a factory function, a field initializer, or a function used with 'runInInjectionContext'
(before execution even gets to the double mergeAll
).
Thanks to OZ_ for leading me onto the right track! Turns out my initial solution of
latest_state = toSignal(
merge(...this.all_the_states().map((single) => toObservable(single))),
{ initialValue: null }
)
wasn't too far off. I "just" needed to bring the reading of all_the_states
and the toObservable
-calls into a "reactivity" / "injection" context. Namely by wrapping with Angular's runInInjectionContext, and a local Injector
. Plus an additional mergeAll
was needed.
So, here's the code: In the Angular Component or - in my case - Service, get a hold of an Injector:
import { Injector } from '@angular/core';
class ComponentOrService {
constructor(private injector: Injector) {
}
}
Then, within that Component/Service I do the following (split into several steps, as each step is fairly involved):
all_the_states_with_inner_observables =
computed(() => runInInjectionContext(this.injector,
() => this.all_the_states().map((single) => toObservable(single))
))
states_as_obs_of_obs = toObservable(computed(() =>
merge(...this.all_the_states_with_inner_observables())
))
latest_state_as_obs = this.states_as_obs_of_obs.pipe(mergeAll())
latest_state = toSignal(this.latest_state_as_obs, { initialValue: null })