javascriptangularrxjsobservableconcatmap

RxJs concatMap is emitting only one http stream out of many


I have been trying to get stream of objects in a sequencial order, however concatMap is not working for me. With mergeMap I get the stream but no in a sequencial order.

Below is my code I am trying:

this.result
    .pipe(
        map((result: Result) => {
            return result.contents
            .map((content: Content) =>  this.databaseService
                .getResource<Resource>('Type', content.key) // http call in service
            )
        }),
        mergeMap((requests: Observable<Resource>[]) => {
            return from(requests)
            .pipe(
                concatMap((resource: Observable<Resource>) => resource), // ----> Trigger only once.
                filter((resource:Resource) => resource.status === 'active'),
                skip(this.pageIndex * this.PAGE_SIZE),
                take(this.PAGE_SIZE),
            )
        }),
    )
    .subscribe({
        next: (resource: Resource) => {
            console.log(resource) // resource stream
        },
        complete: () => console.log('completed')
    });


Solution

  • concatMap will only process one observable source at a time. It will not process subsequent sources until the current source completes.

    In your case the observable from this.databaseService.getResource() is not completing. To force it to complete after the first emission, you could append a take(1) like this:

    concatMap((resource: Observable<Resource>) => resource.pipe(take(1))
    

    Alternatively, if the call to databaseService.getResource() is only meant to emit a single value, you could modify your service to emit only a single value.

    // http call in service

    Note: If you are using the Angular HttpClient, a typical get request will complete when the response is received. So you can probably have your getResource() method return the oservable from the http.get() call.