javascriptrxjsrxjs-observables

Keep values on hold and compare new values with those on hold


Using one Observable in RxJS, all incoming values should be treated like such:

How do I do that? (Tried to fiddle with scan, mergeScan, windowTime and some other operators but could not figure it out.)

Edit: As per comment request here's the current code-snippet, but it won't help much I guess:

const { from } = require('rxjs');
const { windowTime, concatMap, distinct, tap, filter, map, debounceTime, distinctUntilChanged, scan, mergeScan } = require('rxjs/operators');

const myInpExamples = [
    {userId: "userA", ids: ["aa"]},
    {userId: "userB", ids: ["cc"]},
    {userId: "userA", ids: ["aa"]},
    {userId: "userA", ids: ["aa"]},
    {userId: "userB", ids: ["bb"]},
    {otherIds: ['11']},
    {otherIds: ['22']}
]

const arrayDataObservable$ = from(myInpExamples);

const dataPipeline = arrayDataObservable$.pipe(
    windowTime(1000),
    concatMap((obs) => obs.pipe(distinctUntilChanged(    
        (prev, curr) => {
            return JSON.stringify(prev) === JSON.stringify(curr)
        }    
        )))
)

const subscribeToDataPipeline = subscriberName => {
    return dataPipeline.subscribe(val => {
        console.log(subscriberName + ' received: ' + JSON.stringify(val, null, 2));
    })
}

const handleSubscriptionToDataPipeline = () => {
    const subscription1 = subscribeToDataPipeline('Subscriber1');
}

handleSubscriptionToDataPipeline();

Expected outcome here is that Subscriber1 gets those values:


Solution

  • try something like :

    manipulate the data manually using plain javascript inside switchMap and return it with rxjs.of

    arrayDataObservable$.pipe(
        bufferTime(1000),
        switchMap((bufferd) => {
            // manually defined data manipulation
            const groupedObj = Object.groupBy(bufferedVal, el => JSON.stringfy(el));
            const filteredDistinctValues = Object.values(groupedObj).map(el => el[0]);
            // Object.groupBy doesnt respect the original array sort order, filtering from 
            const distinctValues = bufferdVal.filter(el => filteredDistinctValues.includes(el))
            return of(...distinctValues)
        }),
    )
    

    the data manipulation function could be optimized, just use mine as reference