kotlinkotlin-coroutineskotlin-coroutine-channel

Kotlin Coroutines channels send inside fixedRateTimer


I'm working on a hobby project using Kotlin Coroutines for the first time. I have read and watch videos about it and I kinda get the concept. But I'm stuck on a problem. Let me show you my code.

package com.dev.tuber.ingestion.snapshots

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import org.joda.time.LocalTime
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import kotlin.concurrent.fixedRateTimer

object SnapshotsBuffer {
    private val buffer = ConcurrentHashMap<Int, MutableMap<Int, Queue<Snapshot>>>()

    init {
        for (minute in 0..59) {
            buffer[minute] = mutableMapOf()
        }
    }

    suspend fun start(snapshotsChannel: Channel<Snapshot>, composeSnapshots: Channel<MutableMap<Int, Queue<Snapshot>>>) {
        startComposing(composeSnapshots)

        for (snapshot in snapshotsChannel) {
            val currentMinute = getCurrentMinute()
            if (!buffer[currentMinute]!!.containsKey(snapshot.pair.id)) {
                buffer[currentMinute]!![snapshot.pair.id] = LinkedList()
            }

            buffer[currentMinute]!![snapshot.pair.id]!!.add(snapshot)
            println(buffer)
        }
    }

    private fun startComposing(composeSnapshots: Channel<MutableMap<Int, Queue<Snapshot>>>) {
        val oneMinute = (1000 * 60).toLong()

        fixedRateTimer("consuming", true, oneMinute, oneMinute) {
            val previousMinute = getPreviousMinute()
            composeSnapshots.send(buffer[previousMinute]!!) <---- cannot do this
            buffer[getPreviousMinute()] = mutableMapOf()
        }
    }

    private fun getCurrentMinute(): Int {
        return LocalTime().minuteOfHour
    }

    private fun getPreviousMinute(): Int {
        val currentMinute = getCurrentMinute()

        if(currentMinute == 0) return 59
        return currentMinute - 1
    }
}

So. I have two channels. The first channel is snapshotsChannel, this is where Snapshot will arrive. I want to buffer the Snapshot and whenever a minute passes I want to send the buffer to the composeSnapshots channel for further processing.

Basicly I get A LOT OF Snapshot and I don't want to send them directly to further processing. So that's why I want to buffer them per minute per pair.

The problem arises in the startComposing function. The fixedRateTimer is not a suspendable function so I cannot use the send function here. I'm kinda stuck now because I cannot find a solution for this. I have looked into TickerChannel and Kotlin Flow but that doesn't seem like that is the right solution for my problem.

Do you know a solution?


Solution

  • You cannot call a suspend function (suspend fun Channel.send(element: E)) from a non-suspending function.

    In coroutine way you can have an infinite loop which suspends itself for a minute and sends to the channel repeatedly. The great thing is, delay is co-operative on cancellation.

    private suspend fun startComposing(composeSnapshots: Channel<MutableMap<Int, Queue<Snapshot>>>) {
        val oneMinute = (1000 * 60).toLong()
    
        while(true) {
            delay(oneMinute)
    
            val previousMinute = getPreviousMinute()
            composeSnapshots.send(buffer[previousMinute]!!)
            buffer[previousMinute] = mutableMapOf()
        }
    }