1

I found the following implementation of an inclusive takeWhile (found here)

fun <T> Sequence<T>.takeWhileInclusive(pred: (T) -> Boolean): Sequence<T> {
    var shouldContinue = true
    return takeWhile {
        val result = shouldContinue
        shouldContinue = pred(it)
        result
    }
}

The problem is I'm not 100% convinced this is safe if used on a parallel sequence.

My concern is that we'd be relying on the shouldContinue variable to know when to stop, but we're not synchronizing it's access.

Any insights?

jivimberg
  • 840
  • 1
  • 10
  • 21
  • I agree that it's not safe, `takeWhile` should get a stateless function. BTW synchronization would be the least of your problems if this was used in a parallel computation. `takeWhile` isn't even defined in that case. – Marko Topolnik May 16 '18 at 14:38
  • I'm not under the impression Sequences are intended for parallel use cases at all? – Louis Wasserman May 16 '18 at 21:52
  • @LouisWasserman They aren't intended for parallel, but their contract doesn't constrain them to strictly sequential processing. Specifically, the contract of `takeWhile` states "The operation is intermediate and _stateless_." – Marko Topolnik May 17 '18 at 10:02
  • @MarkoTopolnik my understanding is that the phrase "The operation is intermediate and stateless." from the `takeWhile` documentation refers to the whole operation, and not specifically to the predicate. In particular https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/index.html defines stateless as: *operations which require no state and process each element independently like or **require a small constant amount of state to process an element**, for example kotlin.sequences.Sequence.take or kotlin.sequences.Sequence.drop;* – jivimberg May 21 '18 at 05:15
  • Yes, you're right. Unfortunately that's the closest the docs have on it. The state that sentence refers to is the implementation's internal state. The contract doesn't explicitly state the user's function must be stateless. There's a good discussion on this in [java.util.stream](https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html) package Javadoc, section "Stateless behaviors". However, in your case the problem isn't state itself, but that your function _assumes sequential encounter order_. – Marko Topolnik May 21 '18 at 05:43
  • @MarkoTopolnik thanks for the pointer! I believe in Java there's a strong requirement on the predicate being stateless because they have to support *parallelStream()*. In Kotlin there's no such requirement but the comment still applies. So I guess that if the only way of making this safe is synchronizing on the `shouldContinue` it kind of defeats the purpose of parallel consumption. – jivimberg May 23 '18 at 14:50
  • My original point was that synchronization wouldn't solve anything. The result would still be completely broken. OTOH even with the guarantee that the computation is single-threaded, you are still not allowed to assume sequential encounter order for your function. – Marko Topolnik May 23 '18 at 14:58
  • Sorry but I didn't get why the result would be broken. And why can't we assume sequential encounter order in a single-thread execution. – jivimberg May 23 '18 at 15:49

1 Answers1

1

Here's what I've figured out so far.

Question clarification

The question is unclear. There's no such thing as a parallel sequence I probably got them mixed up with Java's parallel streams. What I meant was a sequence that was consumed concurrently.

Sequences are synchronous

As @LouisWasserman pointed out in the comments sequences are not designed for parallel execution. In particular the SequenceBuilder is annotated with @RestrictSuspension. Citing from Kotlin Coroutine repo:

It means that no SequenceBuilder extension of lambda in its scope can invoke suspendContinuation or other general suspending function

Having said that as @MarkoTopolnik commented they can still be used in a parallel program just like any other Object.

Sequences used in parallel

As an example here's a first attempt of using Sequences in parallel

fun launchProcessor(id: Int, iterator: Iterator<Int>) = launch {
    println("[${Thread.currentThread().name}] Processor #$id received ${iterator.next()}")
}

fun main(args: Array<String>) {
    val s = sequenceOf(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
    runBlocking {
        val iterator = s.iterator()
        repeat(10) { launchProcessor(it, iterator) }
    }
}

This code prints:

[ForkJoinPool.commonPool-worker-2] Processor #1 received 1

[ForkJoinPool.commonPool-worker-1] Processor #0 received 0

[ForkJoinPool.commonPool-worker-3] Processor #2 received 2

[ForkJoinPool.commonPool-worker-2] Processor #3 received 3

[ForkJoinPool.commonPool-worker-1] Processor #4 received 3

[ForkJoinPool.commonPool-worker-3] Processor #5 received 3

[ForkJoinPool.commonPool-worker-1] Processor #7 received 5

[ForkJoinPool.commonPool-worker-2] Processor #6 received 4

[ForkJoinPool.commonPool-worker-1] Processor #9 received 7

[ForkJoinPool.commonPool-worker-3] Processor #8 received 6

Which of course is not what we want. As some numbers are consumed twice.

Enter channels

On the other hand if we were to use channels we could write something like this:

fun produceNumbers() = produce {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}

fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    channel.consumeEach {
        println("[${Thread.currentThread().name}] Processor #$id received $it")
    }
}

fun main(args: Array<String>) = runBlocking<Unit> {
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(1000)
    producer.cancel() // cancel producer coroutine and thus kill them all
}

Then the output is:

[ForkJoinPool.commonPool-worker-2] Processor #0 received 1

[ForkJoinPool.commonPool-worker-2] Processor #0 received 2

[ForkJoinPool.commonPool-worker-1] Processor #1 received 3

[ForkJoinPool.commonPool-worker-2] Processor #2 received 4

[ForkJoinPool.commonPool-worker-1] Processor #3 received 5

[ForkJoinPool.commonPool-worker-2] Processor #4 received 6

[ForkJoinPool.commonPool-worker-2] Processor #0 received 7

[ForkJoinPool.commonPool-worker-1] Processor #1 received 8

[ForkJoinPool.commonPool-worker-1] Processor #2 received 9

[ForkJoinPool.commonPool-worker-2] Processor #3 received 10

Furthermore we could implement the takeWhileInclusive method for channels like this:

fun <E> ReceiveChannel<E>.takeWhileInclusive(
        context: CoroutineContext = Unconfined,
        predicate: suspend (E) -> Boolean
): ReceiveChannel<E> = produce(context) {
    var shouldContinue = true
    consumeEach {
        val currentShouldContinue = shouldContinue
        shouldContinue = predicate(it)
        if (!currentShouldContinue) return@produce
        send(it)
    }
}

And it works as expected.

Community
  • 1
  • 1
jivimberg
  • 840
  • 1
  • 10
  • 21