kotlinkotlinx.coroutines

What's the recommended way to delay Kotlin's buildSequence?


I'm trying to poll a paginated API and provide new items to the user as they appear.

fun connect(): Sequence<T> = buildSequence {
    while (true) {
        // result is a List<T>
        val result = dataSource.getFirstPage()
        yieldAll(/* the new data in `result` */)

        // Block the thread for a little bit
    }
}

Here's the sample usage:

for (item in connect()) {
    // do something as each item is made available
}

My first thought was to use the delay function, but I get this message:

Restricted suspended functions can only invoke member or extension suspending functions on their restricted coroutine scope

This is the signature for buildSequence:

public fun <T> buildSequence(builderAction: suspend SequenceBuilder<T>.() -> Unit): Sequence<T>

I think this message means that I can only use the suspend functions in SequenceBuilder: yield and yieldAll and that using arbitrary suspend function calls aren't allowed.

Right now I'm using this to block the sequence building by one second after every time the API is polled:

val resumeTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(1)
while (resumeTime > System.nanoTime()) {
    // do nothing
}

This works, but it really doesn't seem like a good solution. Has anybody encountered this issue before?


Solution

  • Why does it not work? Some research

    When we look at buildSequence, we can see that it takes an builderAction: suspend SequenceBuilder<T>.() -> Unit as its argument. As a client of that method, you'll be able to hand on a suspend lambda that has SequenceBuilder as its receiver (read about lambda with receiver here).
    The SequenceBuilder itself is annotated with RestrictSuspension:

    @RestrictsSuspension
    @SinceKotlin("1.1")
    public abstract class SequenceBuilder<in T> ...
    

    The annotation is defined and commented like this:

    /**
     * Classes and interfaces marked with this annotation are restricted
     * when used as receivers for extension `suspend` functions. 
     * These `suspend` extensions can only invoke other member or extension     
     * `suspend` functions on this particular receiver only 
     * and are restricted from calling arbitrary suspension functions.
     */
    @SinceKotlin("1.1") @Target(AnnotationTarget.CLASS) @Retention(AnnotationRetention.BINARY)
    public annotation class RestrictsSuspension
    

    As the RestrictSuspension documentation tells, in the case of buildSequence, you can pass a lambda with SequenceBuilder as its receiver but with restricted possibilities since you'll only be able to call "other member or extension suspend functions on this particular receiver". That means, the block passed to buildSequence may call any method defined on SequenceBuilder (like yield, yieldAll). Since, on the other hand, the block is "restricted from calling arbitrary suspension functions", using delay does not work. The resulting compiler error verifies it:

    Restricted suspended functions can only invoke member or extension suspending functions on their restricted coroutine scope.

    Ultimately, you need to be aware that the buildSequence creates a coroutine that is an example of a synchronous coroutine. In your example, the sequence code will be executed in the same thread that consumes the sequence by calling connect().

    How to delay the sequence?

    As we learned, The buildSequence creates a synchronous sequence. It's fine to use regular Thread blocking here:

    fun connect(): Sequence<T> = buildSequence {
        while (true) {
            val result = dataSource.getFirstPage()
            yieldAll(result)
            Thread.sleep(1000)
        }
    }
    

    But, do you really want an entire thread to be blocked? Alternatively, you can implement asynchronous sequences as described here. As a result, using delay and other suspending functions will be valid.