firebase-realtime-databaserxjsangular-akitaakita

RxJS get array of emissions at group time. Firebase triggers


I'm new into the RxJS world and I'm kinda lost with this. Hope someone can help me out.

I have a source observable (Firebase through AngularFire) which constantly emits me a lot of data (up to 50 or 80 emissions in a 2s window) in random pike times, since this is something that's slowing down my project performance, I thought the correct way to handle this would be to group the emissions in an array and after, do a transaction with all the data received and insert it in the store.

The result I'm looking for is something like the following:

Taking into account that I'd place a "hold" amount of time of 3s, I'd like the following result:

           1s     1.5s
--> 30 --> 60 --> 100

          1s           2s
--> 5 --> 1 --> 50 --> 70
  1. [30, 60, 100] --> in 1.5s interval time
  2. [5, 1, 50, 70] --> in 2s interval time

The values in the array would be the emissions received in that specific time starting from the first emission received. After that specific amount of time, it would "restart" an initialize in the next batch of emissions (which could actually be in 1 second or in 2 hours, but then, the interval would trigger for 2s the emissiones got)

What I've tried so far was using Window and Buffer, maybe I'm not using these correctly or I'm just dumb but I can't find to get the result I just explained.

filter((snapshot) => { if (snapshot.payload.val().reference) { return snapshot; } }),
window(interval(2000)),
mergeAll(),
withTransaction((snapshots:[]) => {
   snapshots.forEach(snapshot => {
     if (snapshot.type === 'child_changed') {

       this.store.add(snapshot.key, snapshot.val());

     } else if (snapshot.type === 'child_changed') {

       this.store.replace(snapshot.key, snapshot.val());

     } else if (snapshot.type === 'child_removed') {

       this.store.remove(snapshot.key);

     }
   })
})

I don't even know if it's possible with RxJS (I guess so. I have seen many cool things around) but any suggestions or a guide for making it through this, would be greatly appreciated.

Thanks a lot in advance!

note: withTransaction is a custom operator.


Solution

  • not positive what you're after, but seems like you want bufferTime?

    const source = timer(0, 500);
    const buffered = source.pipe(bufferTime(3000));
    buffered.subscribe(val => console.log(val));
    

    this will emit all values collected within the buffer period as an array every 3 seconds.

    blitz demo: https://stackblitz.com/edit/rxjs-vpu97e?file=index.ts

    in your example I THINK you'd just use it as:

    filter((snapshot) => snapshot.payload.val().reference), // this is all you need for filter
    bufferTime(2000),
    withTransaction((snapshots:[]) => {
       ...
    })