swiftreactive-programmingreactive-swift

How to collect values every n seconds with a max count in each interval from a signal in ReactiveSwift?


It seems like a combination of collect(every:on:skipEmpty:discardWhenCompleted:) and collect(count:) in ReactiveSwift. The resulting signal would send an event every n seconds if the count of accumulated values doesn't reach max count during each time interval. But if in a specific time interval, the count of values has reached max count, it will send immediately.

For example, timeInterval = 2s, maxCount = 2 enter image description here


Solution

  • extension SignalProducer {
        func collect(count: Int, every: DispatchTimeInterval, on scheduler: QueueScheduler) -> SignalProducer<[Value], Error> {
            SignalProducer<[Value], Error> { (observer: Signal<[Value], Error>.Observer, lifetime: Lifetime) in
                var collectedValues: [Value] = []
                
                let disposable = CompositeDisposable()
                
                disposable += self
                    .observe(on: scheduler)
                    .start(Signal<Value, Error>.Observer(value: { value in
                        collectedValues.append(value)
                        
                        if collectedValues.count == count {
                            observer.send(value: collectedValues)
                            collectedValues.removeAll()
                        }
                    },
                                                         failed: { error in
                        observer.send(error: error)
                    },
                                                         completed: {
                        if collectedValues.count > 0 {
                            observer.send(value: collectedValues)
                            collectedValues.removeAll()
                        }
                        
                        observer.sendCompleted()
                    },
                                                         interrupted: {
                        observer.sendInterrupted()
                    }))
                
                disposable += SignalProducer<Date, Never>.timer(interval: every,
                                                                on: scheduler)
                .observe(on: scheduler)
                .startWithValues { _ in
                    observer.send(value: collectedValues)
                    collectedValues.removeAll()
                }
                
                lifetime.observeEnded {
                    disposable.dispose()
                }
            }
        }
    }
    

    It collects an array of values until it reaches a certain count and then fires or fires every time interval.

    Test:

    var counter = 0
    let valuesGenerator = SignalProducer.timer(interval: .milliseconds(550), on: QueueScheduler.main)
        .filter { date in
            Calendar.current.component(.second, from: date) % 3 == 0
        }
        .map { _ in
            defer { counter += 1 }
            return counter
        }
        .take(first: 20)
        .promoteError(Error.self)
            
    valuesGenerator
    .collect(count: 3, every: .seconds(5), on: QueueScheduler.main)
    .start(Signal<[Int], Error>.Observer(value: { values in
           print("valuesGenerator, date: \(Date()), values: \(values)")
        },
                                             failed: { error in
            print("valuesGenerator, date: \(Date()), error: \(error)")
        },
                                             completed: {
            print("valuesGenerator, date: \(Date()), completed")
        },
                                             interrupted: {
            print("valuesGenerator, date: \(Date()), interrupted")
        }))
    

    Output:

    valuesGenerator, date: 2024-02-16 16:37:51 +0000, values: [0, 1, 2]
    valuesGenerator, date: 2024-02-16 16:37:51 +0000, values: [3]
    valuesGenerator, date: 2024-02-16 16:37:56 +0000, values: [4, 5]
    valuesGenerator, date: 2024-02-16 16:38:00 +0000, values: [6, 7, 8]
    valuesGenerator, date: 2024-02-16 16:38:01 +0000, values: [9]
    valuesGenerator, date: 2024-02-16 16:38:06 +0000, values: [10, 11, 12]
    valuesGenerator, date: 2024-02-16 16:38:06 +0000, values: []
    valuesGenerator, date: 2024-02-16 16:38:11 +0000, values: [13, 14]
    valuesGenerator, date: 2024-02-16 16:38:15 +0000, values: [15, 16, 17]
    valuesGenerator, date: 2024-02-16 16:38:16 +0000, values: []
    valuesGenerator, date: 2024-02-16 16:38:21 +0000, values: [18, 19]
    valuesGenerator, date: 2024-02-16 16:38:21 +0000, completed