kotlinkotlin-coroutineskotlin-coroutine-channel

Kotlin: Using channel to make a simple work counter


I want to make a thread-safe counter to use on some of my services to count concurrent works.

Say, there's the http service that can serve multiple requests and has a property isWorking that is later to be used to display a spinner.

This is my implementation of the counter:

class CounterImpl @Inject constructor(
    @IoDispatcher private val ioDispatcher: CoroutineDispatcher,
    @MainDispatcher private val mainDispatcher: CoroutineDispatcher,
    private val log: Log
) : Counter {
    private val channel = Channel<Int>()
    private val _isWorking = MutableLiveData(false)
    override val isWorking = _isWorking.asFlow()

    init {
        MainScope().launch {
            channel.consumeAsFlow().onEach {
                log(FILE, "counter got a $it", LogType.Debug)
            }.scan(0) { acc, value -> acc + value }.map { it > 0}
                .collect {
                    withContext(mainDispatcher) {
                        _isWorking.value = it
                    }
                }
        }
    }

    override suspend fun invoke(increment: Boolean) {
        log(FILE, "counter invoked with $increment", LogType.Debug)
        channel.send(if (increment) 1 else -1)
    }
}

So the problem is that sometimes the last send call to the channel does not reach to the consumeAsFlow part of the code.

Here's a sample log of what happens:

[Debug] Counter: counter invoked with true
[Debug] Counter: counter invoked with false
[Debug] Counter: counter got a 1

Here, invoke is called once with true and there's a line that says counter got a 1 that corresponds to that true (increment) call. But there's also an invocation with false and I would expect there to be a corresponding counter got a 0 line. But that one never appears.

I also tried iterating the channel with for c in channel if that's what you're thinking.


Solution

  • You don't need a Channel to do that, a MutableStateFlow is enough because the operation you are doing (incrementing/decrementing a number) is free of side effects.

        val count = MutableStateFlow(0)
    
        fun update(increment: Boolean) {
            count.update { it + if (increment) 1 else -1 }
        }
    

    Note: the function in the update {} lambda may be executed multiple times so it must NOT have side effects.