My application includes various widgets indicating realtime values (temperature, cpu usage, memory, etc....). These widgets are updated from JSON messages received through a single websocket.
At the moment, the updates are handled in realtime, so I update my widget as soon as a new value is received.
The messages are as follow: {widget: "widgetId", value: 150}
.
My issue is that sometimes there is multiple updates per second for the same widget and it makes the screen blink and has a useless performance cost. I want to reduce this by just delaying the UI update to only update with the latest received value for a period of time.
I would like to explore the two following options:
Update all widgets at the same time once every X seconds (by keeping only the most recent update for each widget and sending them all at once to the UI).
Update a widget at most once every X seconds (a debounce by channel).
My socket handler looks like this:
socket.receive(completionHandler: { [weak self] result in
switch result {
case .success(let message):
switch message {
case .data(let data):
guard let update = try? WidgetUpdate(serializedData: data) else {
print("Failed to deserialize data")
return
}
DispatchQueue.main.async {
self?.updateWidget(update: update)
}
@unknown default:
print("unknown")
}
}
}
)
What I tried so far is to use the PassthroughSubject
like this:
let subject = PassthroughSubject<WidgetUpdate, Never>()
cancellable = subject
.debounce(for: .seconds(0.5), scheduler: RunLoop.main)
.sink { update in
self?.updateWidget(update: update)
}
// And to publish inside the publisher from the socket handler like this
subject.send(update)
The problem is that it obviously debounces "all" updates and not by widget.
I would like to avoid creating one PassthroughSubject by widget as widget can be added or deleted dynamically and I don't want to have to manage the creation / deletion of the associated publishers.
I think what will help you is the ReactiveX GroupBy operator. Unfortunately, Combine does not come with a native GroupBy, but in can be implemented in terms of a scan:
import Combine
import Foundation
extension Publisher {
public func grouped<Key: Hashable> (
by key: @escaping (Self.Output) -> Key
) -> some Publisher<CurrentValueSubject<Self.Output, Never>, Self.Failure> {
scan(([:], nil)) { (accum: ([Key: CurrentValueSubject<Self.Output, Never>], CurrentValueSubject<Self.Output, Never>?), next) in
var (accumPubs, _) = accum
if let existingSubject = accumPubs[key(next)] {
existingSubject.send(next)
return (accumPubs, nil)
} else {
let newSubject = CurrentValueSubject<Self.Output, Never>(next)
accumPubs[key(next)] = newSubject
return (accumPubs, newSubject)
}
}.compactMap { $0.1 }
}
}
Once you have that, I believe something like this should solve your problem (this is based on your original example):
struct Element {
let channel: String
let updateNum: Int
}
var subscriptions: Set<AnyCancellable> = Set()
withExtendedLifetime(subscriptions) {
let pub1 = Just(Element(channel: "chan1", updateNum: 1)).delay(for: .seconds(0.1), scheduler: RunLoop.main)
let pub2 = Just(Element(channel: "chan2", updateNum: 1)).delay(for: .seconds(0.2), scheduler: RunLoop.main)
let pub3 = Just(Element(channel: "chan1", updateNum: 2)).delay(for: .seconds(0.2), scheduler: RunLoop.main)
let pub4 = Just(Element(channel: "chan1", updateNum: 3)).delay(for: .seconds(0.3), scheduler: RunLoop.main)
let pub5 = Just(Element(channel: "chan2", updateNum: 2)).delay(for: .seconds(0.4), scheduler: RunLoop.main)
let pub6 = Just(Element(channel: "chan2", updateNum: 3)).delay(for: .seconds(0.5), scheduler: RunLoop.main)
let pub7 = Just(Element(channel: "chan1", updateNum: 4)).delay(for: .seconds(0.6), scheduler: RunLoop.main)
let pub = Publishers.MergeMany(pub1, pub2, pub3, pub4, pub5, pub6, pub7)
pub
.grouped { $0.channel }
.flatMap {
$0.debounce(for: .seconds(0.3), scheduler: RunLoop.main)
// Or:
// $0.throttle(for: .seconds(0.3), scheduler: RunLoop.main, latest: true)
}
.sink { result in print(result) }
.store(in: &subscriptions)
}
RunLoop.main.run()
And here's something based on your actual WebSockets problem:
let subject = PassthroughSubject<WidgetUpdate, Never>()
cancellable = subject
.grouped { $0.widget }
.flatMap {
$0.debounce(for: .seconds(0.5), scheduler: RunLoop.main)
// Or:
// $0.throttle(for: .seconds(0.5), scheduler: RunLoop.main, latest: true)
}
.sink { update in
self?.updateWidget(update: update)
}
// And to publish inside the publisher from the socket handler like this
subject.send(update)
Also, it sounds to me like what you want is really to throttle, rather than debounce, though for your use case, most users probably won't notice the difference between the two. As long as your group your publishers, you can choose to either to throttle or debounce.