I'm trying to poll a paginated API and provide new items to the user as they appear.
fun connect(): Sequence<T> = buildSequence {
while (true) {
// result is a List<T>
val result = dataSource.getFirstPage()
yieldAll(/* the new data in `result` */)
// Block the thread for a little bit
}
}
Here's the sample usage:
for (item in connect()) {
// do something as each item is made available
}
My first thought was to use the delay
function, but I get this message:
Restricted suspended functions can only invoke member or extension suspending functions on their restricted coroutine scope
This is the signature for buildSequence
:
public fun <T> buildSequence(builderAction: suspend SequenceBuilder<T>.() -> Unit): Sequence<T>
I think this message means that I can only use the suspend
functions in SequenceBuilder: yield
and yieldAll
and that using arbitrary suspend
function calls aren't allowed.
Right now I'm using this to block the sequence building by one second after every time the API is polled:
val resumeTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(1)
while (resumeTime > System.nanoTime()) {
// do nothing
}
This works, but it really doesn't seem like a good solution. Has anybody encountered this issue before?
When we look at buildSequence
, we can see that it takes an builderAction: suspend SequenceBuilder<T>.() -> Unit
as its argument. As a client of that method, you'll be able to hand on a suspend
lambda that has SequenceBuilder
as its receiver (read about lambda with receiver here).
The SequenceBuilder
itself is annotated with RestrictSuspension
:
@RestrictsSuspension
@SinceKotlin("1.1")
public abstract class SequenceBuilder<in T> ...
The annotation is defined and commented like this:
/**
* Classes and interfaces marked with this annotation are restricted
* when used as receivers for extension `suspend` functions.
* These `suspend` extensions can only invoke other member or extension
* `suspend` functions on this particular receiver only
* and are restricted from calling arbitrary suspension functions.
*/
@SinceKotlin("1.1") @Target(AnnotationTarget.CLASS) @Retention(AnnotationRetention.BINARY)
public annotation class RestrictsSuspension
As the RestrictSuspension
documentation tells, in the case of buildSequence
, you can pass a lambda with SequenceBuilder
as its receiver but with restricted possibilities since you'll only be able to call "other member or extension suspend
functions on this particular receiver". That means, the block passed to buildSequence
may call any method defined on SequenceBuilder
(like yield
, yieldAll
). Since, on the other hand, the block is "restricted from calling arbitrary suspension functions", using delay
does not work. The resulting compiler error verifies it:
Restricted suspended functions can only invoke member or extension suspending functions on their restricted coroutine scope.
Ultimately, you need to be aware that the buildSequence
creates a coroutine that is an example of a synchronous coroutine. In your example, the sequence code will be executed in the same thread that consumes the sequence by calling connect()
.
As we learned, The buildSequence
creates a synchronous sequence. It's fine to use regular Thread blocking here:
fun connect(): Sequence<T> = buildSequence {
while (true) {
val result = dataSource.getFirstPage()
yieldAll(result)
Thread.sleep(1000)
}
}
But, do you really want an entire thread to be blocked? Alternatively, you can implement asynchronous sequences as described here. As a result, using delay
and other suspending functions will be valid.