5

I'm trying to poll a paginated API and provide new items to the user as they appear.

fun connect(): Sequence<T> = buildSequence {
    while (true) {
        // result is a List<T>
        val result = dataSource.getFirstPage()
        yieldAll(/* the new data in `result` */)

        // Block the thread for a little bit
    }
}

Here's the sample usage:

for (item in connect()) {
    // do something as each item is made available
}

My first thought was to use the delay function, but I get this message:

Restricted suspended functions can only invoke member or extension suspending functions on their restricted coroutine scope

This is the signature for buildSequence:

public fun <T> buildSequence(builderAction: suspend SequenceBuilder<T>.() -> Unit): Sequence<T>

I think this message means that I can only use the suspend functions in SequenceBuilder: yield and yieldAll and that using arbitrary suspend function calls aren't allowed.

Right now I'm using this to block the sequence building by one second after every time the API is polled:

val resumeTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(1)
while (resumeTime > System.nanoTime()) {
    // do nothing
}

This works, but it really doesn't seem like a good solution. Has anybody encountered this issue before?

mattbdean
  • 2,532
  • 5
  • 26
  • 58
  • 1
    Your current hackaround actually utilizes an entire CPU thread to 100% while the sequence is running...yikes. Use a scheduled executor + concurrent queue in the meantime. – F. George Mar 13 '18 at 18:02
  • 1
    Note that `buildSequence` can only be delayed synchronously: since the coroutine is run by an iterator, it always expects a suspension with a proper sequence element to return it as the iterator's next item. That's the reason why it `RestrictsSuspension`. Another concern, though, is that you use active wait instead of blocking the thread. – hotkey Mar 13 '18 at 18:25

2 Answers2

12

Why does it not work? Some research

When we look at buildSequence, we can see that it takes an builderAction: suspend SequenceBuilder<T>.() -> Unit as its argument. As a client of that method, you'll be able to hand on a suspend lambda that has SequenceBuilder as its receiver (read about lambda with receiver here).
The SequenceBuilder itself is annotated with RestrictSuspension:

@RestrictsSuspension
@SinceKotlin("1.1")
public abstract class SequenceBuilder<in T> ...

The annotation is defined and commented like this:

/**
 * Classes and interfaces marked with this annotation are restricted
 * when used as receivers for extension `suspend` functions. 
 * These `suspend` extensions can only invoke other member or extension     
 * `suspend` functions on this particular receiver only 
 * and are restricted from calling arbitrary suspension functions.
 */
@SinceKotlin("1.1") @Target(AnnotationTarget.CLASS) @Retention(AnnotationRetention.BINARY)
public annotation class RestrictsSuspension

As the RestrictSuspension documentation tells, in the case of buildSequence, you can pass a lambda with SequenceBuilder as its receiver but with restricted possibilities since you'll only be able to call "other member or extension suspend functions on this particular receiver". That means, the block passed to buildSequence may call any method defined on SequenceBuilder (like yield, yieldAll). Since, on the other hand, the block is "restricted from calling arbitrary suspension functions", using delay does not work. The resulting compiler error verifies it:

Restricted suspended functions can only invoke member or extension suspending functions on their restricted coroutine scope.

Ultimately, you need to be aware that the buildSequence creates a coroutine that is an example of a synchronous coroutine. In your example, the sequence code will be executed in the same thread that consumes the sequence by calling connect().

How to delay the sequence?

As we learned, The buildSequence creates a synchronous sequence. It's fine to use regular Thread blocking here:

fun connect(): Sequence<T> = buildSequence {
    while (true) {
        val result = dataSource.getFirstPage()
        yieldAll(result)
        Thread.sleep(1000)
    }
}

But, do you really want an entire thread to be blocked? Alternatively, you can implement asynchronous sequences as described here. As a result, using delay and other suspending functions will be valid.

Dave Leeds
  • 698
  • 6
  • 7
s1m0nw1
  • 76,759
  • 17
  • 167
  • 196
2

Just for an alternate solution...

If what you're really trying to do is asynchronously produce elements, you can use Flows which are basically asynchronous sequences.

Here is a quick table:

Sync Async
Single Normal value

fun example(): String
suspending

suspend fun example(): String
or
fun example(): Deferred<String>
Many Sequence

fun example(): Sequence<String>
Flow

fun example(): Flow<String>

You can convert your Sequence<T> to a Flow<T> by replacing the sequence { ... } builder with the flow { ... } builder and then replace yield/yieldAll with emit/emitAll:

fun example(): Flow<String> = flow {
    (1..5).forEach { getString().let { emit(it) } }
}

suspend fun getString(): String = { ... }

So, for your example:

fun connect(): Flow<T> = flow {
    while (true) {

        // Call suspend function to get data from dataSource
        val result: List<T> = dataSource.getFirstPage()
        emitAll(result)

        // _Suspend_ for a little bit
        delay(1000)
    }
}
Matt Klein
  • 7,856
  • 6
  • 45
  • 46