angularrxjsrxjs-observables

Angular recursive observable array fired complete before real end


I have an object that can has children. I have a function to insert or update instance (and children). But I have a problem with observables.

When I use forkJoin, the HTTP request (createSite or updateSite) is never execute and the complete is fired instantly. Now I use combineLatest, calls are executed. But the complete event is fired before all observable are ended.

I have also tried expand for children, butit loop and create a lot of time the same instance.

function save(): void {
  combineLatest(this.updatedSites.map(s => this.upsertSites(s)))
    .subscribe({
       complete: () => // somme actions,
    });
}

upsertSites(site: Site, parentId?: number): Observable<any> {
  const sourceSite: Site | undefined = this.sourceSites.map(s => s.findSite(site.id).at(-1)).filter(s => !!s).at(-1);
  if (sourceSite && !this.isSiteDeepEqualWithoutChildren(sourceSite ?? { label: '' }, site)) {
    return this.siteService.updateSite(site.id, site.label, parentId)
      .pipe(
        catchError(error => {
          console.log(error);
          return combineLatest(site.children.map((s: Site) => this.upsertSites(s, site.id)));
        }),
        mergeMap(() => combineLatest(site.children.map((s: Site) => this.upsertSites(s, site.id)))),
      );
  } else if (!sourceSite) {
    return this.siteService.createSite(site.label, parentId)
      .pipe(
        mergeMap(data => {
          site.id = data.id;
          return combineLatest(site.children.map((s: Entities.Site) => this.upsertSites(s, site.id)));
        }),
      );
  } else {
    return combineLatest(site.children.map((s: Entities.Site) => this.upsertSites(s, site.id)));
  }
}

What did I do wrong ? What did I misunderstand ?


Solution

  • So after some thinking, I have found a solution. Here it is if someone needs it :

    function save(): void {
      forkJoin(this.updatedSites.map(s => this.upsertSites(s)))
        .subscribe({
           complete: () => // somme actions,
        });
    }
    
    upsertSites(site: Site, parentId?: number): Observable<any> {
      return new Observable((subscriber) => {
        subscriber.next(site);
    
        const upsertChildren = (site: Site) => forkJoin(site.children.map((s: Site) => this.upsertSites(s, site.id)))
          .subscribe({
            complete: () => subscriber.complete(),
          })
    
        const sourceSite: Site | undefined = this.sourceSites.map(s => s.findSite(site.id).at(-1)).filter(s => !!s).at(-1);
        if (sourceSite && !this.isSiteDeepEqualWithoutChildren(sourceSite ?? { label: '' }, site)) {
          this.siteService.updateSite(site.id, site.label, parentId)
            .subscribe({
              next: () => upsertChildren(site),
              error: error => {
                console.log(error)
                upsertChildren(site);
              },
            });
        } else if (!sourceSite) {
          this.siteService.createSite(site.label, parentId)
            .subscribe({
              next: data => {
                site.id = data.id;
                upsertChildren(site);
              },
              error: error => {
                console.log(error)
                subscriber.complete();
              },
            });
        } else {
          upsertChildren(site);
        }
      });
    }
    

    I use forkJoin and it works now. In fact, I have miss some information in the documentation :

    [...] complete immediately if an empty array is passed

    RXJS - forkJoin

    So I give a value to the next and then complete my custom observable. So thing I found dirty is that I give a unused value to the next but it works !