angulartypescriptrxjsangular-signals

How do you merge(/mergeAll) a Signal of (an array of) Signals in Angular?


In rxjs-land, you can merge multiple Observables with the merge operator. If you have an array of Observables, you simply spread that array into the merge operator, like so: merge(...arrayOfObservables). And if you have a 'higher-order' Observable (an Observable that emits Observables), you can use mergeAll.

I'm looking to do something similar to mergeAll with Angular's new Signals. I have a Signal which holds an array of Signals, all_the_states = signal([] as Signal<any>[]). all_the_states gets added to (and removed from) over time. That works easily enough via mutate. Now I want to create a simple Signal latest_state that always holds the latest value of any of the 'inner' Signals.

I thought I could simply "take a trip to rxjs-land, merge, and go back", i.e. do the following:

latest_state = toSignal(
    merge(...this.all_the_states().map((single) => toObservable(single))),
    { initialValue: null }
  )

However, when I add something to all_the_states, latest_state just stays null.

I've also tried

higher_order_observable: Observable<Observable<any>[]> =
  toObservable(this.selection_streams).pipe(
    map(signals => signals.map(signal => toObservable(signal)))
  )
single_observable: Observable<any> =
  this.higher_order_observable.pipe(mergeAll(), mergeAll())
latest_state = toSignal(this.single_observable, { initialValue: null })

which works as far as the types are concerned - though I'm not sure about the double mergeAll - but in that case Angular gives me the error NG0203: toObservable() can only be used within an injection context such as a constructor, a factory function, a field initializer, or a function used with 'runInInjectionContext' (before execution even gets to the double mergeAll).


Solution

  • Thanks to OZ_ for leading me onto the right track! Turns out my initial solution of

    latest_state = toSignal(
        merge(...this.all_the_states().map((single) => toObservable(single))),
        { initialValue: null }
      )
    

    wasn't too far off. I "just" needed to bring the reading of all_the_states and the toObservable-calls into a "reactivity" / "injection" context. Namely by wrapping with Angular's runInInjectionContext, and a local Injector. Plus an additional mergeAll was needed.

    So, here's the code: In the Angular Component or - in my case - Service, get a hold of an Injector:

    import { Injector } from '@angular/core';
    
    class ComponentOrService {
      constructor(private injector: Injector) {
      }
    }
    

    Then, within that Component/Service I do the following (split into several steps, as each step is fairly involved):

    all_the_states_with_inner_observables =
      computed(() => runInInjectionContext(this.injector,
        () => this.all_the_states().map((single) => toObservable(single))
      ))
    states_as_obs_of_obs = toObservable(computed(() =>
      merge(...this.all_the_states_with_inner_observables())
    ))
    latest_state_as_obs = this.states_as_obs_of_obs.pipe(mergeAll())
    latest_state = toSignal(this.latest_state_as_obs, { initialValue: null })