angulartypescriptrxjs

How To Combine Responses Using RxJS merge() & from()


I have to make n (user defined) http calls. The URLs are generated programatically from user inputs. Each month must be queried separately, so n depends on the date range the user has selected. In the example below, I wish to query 1 year of data, so the array of URLs would look like this:

[
    "https://opendata.nhsbsa.net/api/3/action/datastore_search?resource_id=EPD_202301&sql=SELECT * from `EPD_202301` WHERE BNF_CODE = '0410030C0AAAFAF' AND PRACTICE_CODE = 'Y03641' LIMIT 5",
    "https://opendata.nhsbsa.net/api/3/action/datastore_search?resource_id=EPD_202302&sql=SELECT * from `EPD_202302` WHERE BNF_CODE = '0410030C0AAAFAF' AND PRACTICE_CODE = 'Y03641' LIMIT 5",
    "https://opendata.nhsbsa.net/api/3/action/datastore_search?resource_id=EPD_202303&sql=SELECT * from `EPD_202303` WHERE BNF_CODE = '0410030C0AAAFAF' AND PRACTICE_CODE = 'Y03641' LIMIT 5",
    "https://opendata.nhsbsa.net/api/3/action/datastore_search?resource_id=EPD_202304&sql=SELECT * from `EPD_202304` WHERE BNF_CODE = '0410030C0AAAFAF' AND PRACTICE_CODE = 'Y03641' LIMIT 5",
    "https://opendata.nhsbsa.net/api/3/action/datastore_search?resource_id=EPD_202305&sql=SELECT * from `EPD_202305` WHERE BNF_CODE = '0410030C0AAAFAF' AND PRACTICE_CODE = 'Y03641' LIMIT 5",
    "https://opendata.nhsbsa.net/api/3/action/datastore_search?resource_id=EPD_202306&sql=SELECT * from `EPD_202306` WHERE BNF_CODE = '0410030C0AAAFAF' AND PRACTICE_CODE = 'Y03641' LIMIT 5",
    "https://opendata.nhsbsa.net/api/3/action/datastore_search?resource_id=EPD_202307&sql=SELECT * from `EPD_202307` WHERE BNF_CODE = '0410030C0AAAFAF' AND PRACTICE_CODE = 'Y03641' LIMIT 5",
    "https://opendata.nhsbsa.net/api/3/action/datastore_search?resource_id=EPD_202308&sql=SELECT * from `EPD_202308` WHERE BNF_CODE = '0410030C0AAAFAF' AND PRACTICE_CODE = 'Y03641' LIMIT 5",
    "https://opendata.nhsbsa.net/api/3/action/datastore_search?resource_id=EPD_202309&sql=SELECT * from `EPD_202309` WHERE BNF_CODE = '0410030C0AAAFAF' AND PRACTICE_CODE = 'Y03641' LIMIT 5",
    "https://opendata.nhsbsa.net/api/3/action/datastore_search?resource_id=EPD_202310&sql=SELECT * from `EPD_202310` WHERE BNF_CODE = '0410030C0AAAFAF' AND PRACTICE_CODE = 'Y03641' LIMIT 5",
    "https://opendata.nhsbsa.net/api/3/action/datastore_search?resource_id=EPD_202311&sql=SELECT * from `EPD_202311` WHERE BNF_CODE = '0410030C0AAAFAF' AND PRACTICE_CODE = 'Y03641' LIMIT 5",
    "https://opendata.nhsbsa.net/api/3/action/datastore_search?resource_id=EPD_202312&sql=SELECT * from `EPD_202312` WHERE BNF_CODE = '0410030C0AAAFAF' AND PRACTICE_CODE = 'Y03641' LIMIT 5"
]

I would like the results to be combined into 1 array of data. I would like each response to be populated in the array as soon as it arrives.

I did a beginner RxJS course and discovered that merge() is most suitable for this. However, the course says to do this:

import { merge } from 'rxjs';
import { ajax } from 'rxjs/ajax';

const url1 = "https://opendata.nhsbsa.net/api/3/action/datastore_search?resource_id=EPD_202301&sql=SELECT * from `EPD_202301` WHERE BNF_CODE = '0410030C0AAAFAF' AND PRACTICE_CODE = 'Y03641' LIMIT 5";
const url2 = "https://opendata.nhsbsa.net/api/3/action/datastore_search?resource_id=EPD_202302&sql=SELECT * from `EPD_202302` WHERE BNF_CODE = '0410030C0AAAFAF' AND PRACTICE_CODE = 'Y03641' LIMIT 5";

const arr = [];
const observer = {
    next: (x) => arr.push(x),
    complete: () => console.log(arr),
}

const merged$ = merge(
    ajax.getJSON(url1),
    ajax.getJSON(url2),
);

merged$.subscribe(observer);

This does what I need, but I cannot explicitly state each URL. I don't know how many there will be, so they are stored in an array.

Then I found that from() might help:

from(urls).pipe(
    merge((url: any) => this.getDatastoreSearchMonthly(url))
);

However, I am getting a TypeScript error:

Argument of type 'Observable<unknown>' is not assignable to parameter of type 'OperatorFunction<string, DatastoreSearch[]>'.
  Type 'Observable<unknown>' provides no match for the signature '(source: Observable<string>): Observable<DatastoreSearch[]>'.ts(2345)
No overload matches this call.
  The last overload gave the following error.
    Argument of type '(url: string) => Observable<unknown>' is not assignable to parameter of type 'number | ObservableInput<unknown> | SchedulerLike'.ts(2769)
nhs-api.service.ts(76, 19): Did you mean to call this expression?
merge.d.ts(8, 25): The last overload is declared here.

My service in full:

import { HttpClient } from '@angular/common/http';
import { Injectable } from '@angular/core';
import { from, map, merge, Observable } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { DatastoreSearch, DatastoreSearchSql } from 'src/app/types/nhs-api/epd';
import * as moment from 'moment';

const baseUrl = 'https://opendata.nhsbsa.net/api/3/action/datastore_search';
type Options = {
    startDate: moment.Moment,
    endDate: moment.Moment,
    practiceCode: string,
    bnfCode: string,
}

@Injectable({
    providedIn: 'root'
})
export class NhsApiService {

    constructor(private http: HttpClient) { }

    public getDatastoreSearch(options: string): Observable<DatastoreSearch> {
        const url = `${baseUrl}?${options}`;
        return this.http.get<DatastoreSearch>(url);
    }

    public getDatastoreSearchSql(resourceId: string, sql: string): Observable<DatastoreSearchSql> {
        const url = `${baseUrl}?resource_id=${resourceId}&sql=${sql}`;
        return this.http.get<DatastoreSearchSql>(url);
    }

    private getDatastoreSearchMonthly(url: string): Observable<DatastoreSearch> {
        return this.http.get<DatastoreSearch>(url);
    }

    private getUrls(options: Options): string[] {
        // Ensure first day of month is selected
        const startDate = options.startDate.startOf('month');
        const endDate = options.endDate.startOf('month');

        const urls = [];

        const includeSearchTerm = (key: string, field: string) => options[key] ? `${field} = '${options[key]}' AND ` : '';
        const removeJoiningTerm = (sql: string, term: string) => sql.endsWith(term) ? sql.substring(0, sql.length - term.length).trim() : sql;

        const getSql = (resourceId: string): string => {
            const tableName = '`' + resourceId + '`';
            let sql = `SELECT * from ${tableName} WHERE `;
            sql += includeSearchTerm('bnfCode', 'BNF_CODE');
            sql += includeSearchTerm('practiceCode', 'PRACTICE_CODE');
            sql = sql.trim();
            sql = removeJoiningTerm(sql, 'AND');
            sql = removeJoiningTerm(sql, 'WHERE');
            sql = `${sql} LIMIT 5`
            return sql;
        }

        for (const m = startDate; m.isSameOrBefore(endDate); m.add(1, 'month')) {
            const dt = m.format('YYYYMM');
            const resourceId = `EPD_${dt}`;
            const sql = getSql(resourceId);
            const url = `${baseUrl}?resource_id=${resourceId}&sql=${sql}`;
            urls.push(url);
        }

        console.log(urls);

        return urls;
    }

    public getMonthlyData(options: Options): Observable<DatastoreSearch[]> {
        const urls = this.getUrls(options);

        return from(urls).pipe(
            merge((url: string) => ajax.getJSON(url))
        );
    }
}

And the component that calls it:

import { Component, OnInit, signal } from '@angular/core';
import { MomentDateAdapter } from '@angular/material-moment-adapter';
import { DateAdapter, MAT_DATE_FORMATS, MAT_DATE_LOCALE } from '@angular/material/core';
import { FormGroup, NonNullableFormBuilder } from '@angular/forms';
import { MONTH_YEAR_FORMATS } from 'src/app/config/dates';
import { DatastoreSearch } from 'src/app/types/nhs-api/epd';
import { Observable } from 'rxjs';
import { NhsApiService } from 'src/app/services/nhs-api/nhs-api.service';
import * as moment from 'moment';

@Component({
    selector: 'app-nhs-api-scratch',
    templateUrl: './scratch.component.html',
    styleUrls: ['./scratch.component.scss'],
    providers: [
        { provide: DateAdapter, useClass: MomentDateAdapter, deps: [MAT_DATE_LOCALE] },
        { provide: MAT_DATE_FORMATS, useValue: MONTH_YEAR_FORMATS },
    ],
})
export class ScratchComponent implements OnInit {

    public form!: FormGroup;

    public data$: Observable<DatastoreSearch[]> = new Observable();

    readonly panelOpenState = signal(false);

    constructor(
        private fb: NonNullableFormBuilder,
        private readonly service: NhsApiService,
    ) { }

    public ngOnInit(): void {
        this.form = this.fb.group({
            practiceCode: this.fb.control('Y03641'),
            bnfCode: this.fb.control('0410030C0AAAFAF'),
            startDate: this.fb.control(moment('2023-01-01')),
            endDate: this.fb.control(moment('2023-12-01')),
        });
    }

    public onSubmit() {
        this.data$ = this.service.getMonthlyData(this.form.value);
    }
}


Solution

  • Try using mergeMap, since you are making API calls for each emissions of an array of observables created by from.

    merge combines multiple streams into a single stream which is when it is used as a source of the stream or as a pipeable operation const example = first.pipe(merge(second));.

    merge - learn rxjs

    public getMonthlyData(options: Options): Observable<DatastoreSearch[]> {
      const urls = this.getUrls(options);
    
      return from(urls).pipe(
        mergeMap((url: string) => ajax.getJSON<DatastoreSearch>(url)),
        toArray()
      );
    }
    

    If the order with the API calls are made (API calls are made as per array order), then we should use concatMap, else mergeMap is enough when the order does not matter.

    We use toArray() to aggregated all the emissions when the final result is emitted.