I'm working on a hobby project using Kotlin Coroutines for the first time. I have read and watch videos about it and I kinda get the concept. But I'm stuck on a problem. Let me show you my code.
package com.dev.tuber.ingestion.snapshots
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import org.joda.time.LocalTime
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import kotlin.concurrent.fixedRateTimer
object SnapshotsBuffer {
private val buffer = ConcurrentHashMap<Int, MutableMap<Int, Queue<Snapshot>>>()
init {
for (minute in 0..59) {
buffer[minute] = mutableMapOf()
}
}
suspend fun start(snapshotsChannel: Channel<Snapshot>, composeSnapshots: Channel<MutableMap<Int, Queue<Snapshot>>>) {
startComposing(composeSnapshots)
for (snapshot in snapshotsChannel) {
val currentMinute = getCurrentMinute()
if (!buffer[currentMinute]!!.containsKey(snapshot.pair.id)) {
buffer[currentMinute]!![snapshot.pair.id] = LinkedList()
}
buffer[currentMinute]!![snapshot.pair.id]!!.add(snapshot)
println(buffer)
}
}
private fun startComposing(composeSnapshots: Channel<MutableMap<Int, Queue<Snapshot>>>) {
val oneMinute = (1000 * 60).toLong()
fixedRateTimer("consuming", true, oneMinute, oneMinute) {
val previousMinute = getPreviousMinute()
composeSnapshots.send(buffer[previousMinute]!!) <---- cannot do this
buffer[getPreviousMinute()] = mutableMapOf()
}
}
private fun getCurrentMinute(): Int {
return LocalTime().minuteOfHour
}
private fun getPreviousMinute(): Int {
val currentMinute = getCurrentMinute()
if(currentMinute == 0) return 59
return currentMinute - 1
}
}
So. I have two channels. The first channel is snapshotsChannel
, this is where Snapshot
will arrive. I want to buffer the Snapshot
and whenever a minute passes I want to send the buffer to the composeSnapshots
channel for further processing.
Basicly I get A LOT OF Snapshot
and I don't want to send them directly to further processing. So that's why I want to buffer them per minute per pair.
The problem arises in the startComposing
function. The fixedRateTimer
is not a suspendable function so I cannot use the send function here. I'm kinda stuck now because I cannot find a solution for this. I have looked into TickerChannel and Kotlin Flow but that doesn't seem like that is the right solution for my problem.
Do you know a solution?
You cannot call a suspend function (suspend fun Channel.send(element: E)
) from a non-suspending function.
In coroutine way you can have an infinite loop which suspends itself for a minute and sends to the channel repeatedly. The great thing is, delay is co-operative on cancellation.
private suspend fun startComposing(composeSnapshots: Channel<MutableMap<Int, Queue<Snapshot>>>) {
val oneMinute = (1000 * 60).toLong()
while(true) {
delay(oneMinute)
val previousMinute = getPreviousMinute()
composeSnapshots.send(buffer[previousMinute]!!)
buffer[previousMinute] = mutableMapOf()
}
}