1

I have a function generating a very long sequence of work items. Generating these items is fast, but there are too many in total to store a list of them in memory. Processing the items produces no results, just side effects.

I would like to process these items across multiple threads. One solution is to have a thread read from the generator and write to a concurrent bounded queue, and a number of executor threads polling for work from the bounded queue, but this is a lot of things to set up.

Is there anything in the standard library that would help me do that?

I had initially tried

items.map { async(executor) process(it) }.forEach { it.await() }

But, as pointed out in how to implement parallel mapping for sequences in kotlin, this doesn't work for reasons that are obvious in retrospect.

Is there a quick way to do this (possibly with an external library), or is manually setting up a bounded queue in the middle my best option?

zale
  • 1,248
  • 11
  • 26
  • Take a look at this: https://kotlinlang.org/docs/reference/coroutines/flow.html – Daniel Nov 25 '19 at 16:05
  • 1
    As far as I know parallel executions is one of the (few) advantages that Java Stream still has over the Kotlin Sequence. So maybe you could use streams instead. – findusl Nov 25 '19 at 16:38
  • @findusl - I've attempted a thing with a parallel stream, but it doesn't seem to work well. I have no good source, but e.g. https://stackoverflow.com/a/30826175/ "There are known problems about processing infinite streams in parallel. In particular there's no way to split the task to equal parts effectively." – zale Nov 25 '19 at 18:09

3 Answers3

1

You can look at coroutines combined with channels.

If all work items can be emmited on demand with producer channel. Then it's possible to await for each items and process it with a pool of threads.

An example :

sealed class Stream {
    object End: Stream()
    class Item(val data: Long): Stream()
}

val produceCtx = newSingleThreadContext("producer")
// A dummy producer that send one million Longs on its own thread
val producer = CoroutineScope(produceCtx).produce {
    for (i in (0 until 1000000L)) send(Stream.Item(i)) 
    send(Stream.End)
}

val workCtx = newFixedThreadPoolContext(4, "work")
val workers = Channel<Unit>(4)
repeat(4) { workers.offer(Unit) }

for(_nothing in workers) { // launch 4 times then wait for a task to finish
    launch(workCtx) {
        when (val item = producer.receive()) {
            Stream.End -> workers.close()
            is Stream.Item -> {
                workFunction(item.data) // Actual work here
                workers.offer(Unit) // Notify to launch a new task
            }
        }
    }
}
Lionel Briand
  • 1,732
  • 2
  • 13
  • 21
-1

Your magic word would be .asSequence():

items 
  .asSequence() // Creates lazy executable sequence
  .forEach { launch { executor.process(it) } } // If you don't need the value aftrwards, use 'launch', a.k.a. "fire and forget"

but there are too many in total to store a list of them in memory

Then don't map to list and don't collect the values, no matter if you work with Kotlin or Java.

Neo
  • 1,869
  • 1
  • 7
  • 20
-1

As long as you are on the JVM, you can write yourself an extension function, that works the sequence in chunks and spawns futures for all entries in a chunk. Something like this:

@Suppress("UNCHECKED_CAST")
fun <T, R> Sequence<T>.mapParallel(action: (value: T) -> R?): Sequence<R?> {
  val numThreads = Runtime.getRuntime().availableProcessors() - 1
  return this
    .chunked(numThreads)
    .map { chunk ->
      val threadPool = Executors.newFixedThreadPool(numThreads)
      try {
        return@map chunk
          .map {
            // CAUTION -> needs to be written like this
            // otherwise the submit(Runnable) overload is called
            // which always returns an empty Future!!!
            val callable: () -> R? = { action(it) }
            threadPool.submit(callable)
          }
      } finally {
        threadPool.shutdown()
      }
    }
    .flatten()
    .map { future -> future.get() }
}

You can then just use it like:

items
  .mapParallel { /* process an item */ }
  .forEach { /* handle the result */ }

As long as workload per item is similar, this gives a good parallel processing.

Mathias Henze
  • 2,190
  • 1
  • 12
  • 8