I want to make a thread-safe counter to use on some of my services to count concurrent works.
Say, there's the http service that can serve multiple requests and has a property isWorking
that is later to be used to display a spinner.
This is my implementation of the counter:
class CounterImpl @Inject constructor(
@IoDispatcher private val ioDispatcher: CoroutineDispatcher,
@MainDispatcher private val mainDispatcher: CoroutineDispatcher,
private val log: Log
) : Counter {
private val channel = Channel<Int>()
private val _isWorking = MutableLiveData(false)
override val isWorking = _isWorking.asFlow()
init {
MainScope().launch {
channel.consumeAsFlow().onEach {
log(FILE, "counter got a $it", LogType.Debug)
}.scan(0) { acc, value -> acc + value }.map { it > 0}
.collect {
withContext(mainDispatcher) {
_isWorking.value = it
}
}
}
}
override suspend fun invoke(increment: Boolean) {
log(FILE, "counter invoked with $increment", LogType.Debug)
channel.send(if (increment) 1 else -1)
}
}
So the problem is that sometimes the last send call to the channel does not reach to the consumeAsFlow
part of the code.
Here's a sample log of what happens:
[Debug] Counter: counter invoked with true
[Debug] Counter: counter invoked with false
[Debug] Counter: counter got a 1
Here, invoke is called once with true
and there's a line that says counter got a 1
that corresponds to that true (increment) call. But there's also an invocation with false
and I would expect there to be a corresponding counter got a 0
line. But that one never appears.
I also tried iterating the channel with for c in channel
if that's what you're thinking.
You don't need a Channel
to do that, a MutableStateFlow
is enough because the operation you are doing (incrementing/decrementing a number) is free of side effects.
val count = MutableStateFlow(0)
fun update(increment: Boolean) {
count.update { it + if (increment) 1 else -1 }
}
Note: the function in the update {}
lambda may be executed multiple times so it must NOT have side effects.