17

I have a List<Flow<T>>, and would like to generate a Flow<List<T>>. This is almost what combine does - except that combine waits for each and every Flow to emit an initial value, which is not what I want. Take this code for example:

val a = flow {
  repeat(3) {
    emit("a$it")
    delay(100)
  }
}
val b = flow {
  repeat(3) {
    delay(150)
    emit("b$it")
  }
}
val c = flow {
  delay(400)
  emit("c")
}
val flows = listOf(a, b, c)
runBlocking {
  combine(flows) {
    it.toList()
  }.collect { println(it) }
}

With combine (and hence as-is), this is the output:

[a2, b1, c]
[a2, b2, c]

Whereas I'm interested in all the intermediary steps too. This is what I want from those three flows:

[]
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]

Right now I have two work-arounds, but none of them are great... The first one is plain ugly and doesn't work with nullable types:

val flows = listOf(a, b, c).map {
  flow {
    emit(null)
    it.collect { emit(it) }
  }
}
runBlocking {
  combine(flows) {
    it.filterNotNull()
  }.collect { println(it) }
}

By forcing all the flows to emit a first, irrelevant value, the combine transformer is indeed called, and lets me remove the null values which I know are not actual values. Iterating on that, more readable but heavier:

sealed class FlowValueHolder {
  object None : FlowValueHolder()
  data class Some<T>(val value: T) : FlowValueHolder()
}
val flows = listOf(a, b, c).map {
  flow {
    emit(FlowValueHolder.None)
    it.collect { emit(FlowValueHolder.Some(it)) }
  }
}
runBlocking {
  combine(flows) {
    it.filterIsInstance(FlowValueHolder.Some::class.java)
      .map { it.value }
  }.collect { println(it) }
}

Now this one works just fine, but still feels like I'm overdoing stuff. Is there a method that I'm missing in the coroutines library?

Community
  • 1
  • 1
Marc Plano-Lesay
  • 6,808
  • 10
  • 44
  • 75
  • You could replace `FlowValueHolder` with just `class Holder(val value: T)` and your flow would be `Flow?>`. This upgrades your first, simpler example to nullable `T`. – Marko Topolnik Apr 13 '20 at 10:38
  • Oh yeah that's a good point! I would still like to see something better, but that's a nice improvement, thanks! – Marc Plano-Lesay Apr 13 '20 at 11:06
  • @MarcPlano-Lesay I don't think you can do it any better than you did in your first approach, although I wouldn't consider `[]` and intermediate step, because actually nothing was emitted. – Willi Mentzel Apr 14 '20 at 17:33
  • 1
    @WilliMentzel That's a fair point about `[]`. In my specific case, I don't really care about an empty list being emitted, but it would make more sense not to have anything indeed. I'm currently using a slightly improved version, I guess I'll post it as an answer later today :-) – Marc Plano-Lesay Apr 14 '20 at 20:43
  • @MarcPlano-Lesay I also posted an answer :). pls take a look – Willi Mentzel Apr 17 '20 at 09:45

3 Answers3

14

How about this:

inline fun <reified T> instantCombine(vararg flows: Flow<T>) = channelFlow {
    val array= Array(flows.size) {
        false to (null as T?) // first element stands for "present"
    }

    flows.forEachIndexed { index, flow ->
        launch {
            flow.collect { emittedElement ->
                array[index] = true to emittedElement
                send(array.filter { it.first }.map { it.second })
            }
        }
    }
}

It solves a few problems:

  • no need to introduce a new type
  • [] is not in the resulting Flow
  • abstracts away null-handling (or however it is solved) from the call-site, the resulting Flow deals with it itself

So, you won't notice any implementation specific workarounds, because you don't have to deal with it during collection:

runBlocking {
    instantCombine(a, b, c).collect {
        println(it)
    }
}

Output:

[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]

Try it out here!

Edit: Updated answer to handle Flows which emit null values too.


* The used low-level array is thread-safe. It's as if you are dealing with single variables.

Willi Mentzel
  • 27,862
  • 20
  • 113
  • 121
  • 1
    I like the idea! Sadly you're enforcing `T` to be non-nullable (needs to inherit from `Any`, can't be `Any?`). So a more generic version would still need to wrap the values in a holder. But combined with the array idea, this solves nicely the `[]` issue! – Marc Plano-Lesay Apr 17 '20 at 21:12
  • @MarcPlano-Lesay yes, you are right! that's not optimal. I used null for encoding presence which then in turn does not allow Flows which emit null values. We need a wrapper type indeed. I solved it using a simple Pair. :) – Willi Mentzel Apr 18 '20 at 10:15
  • 1
    I think keeping a wrapper class rather than a boolean is slightly cleaner (you're allocating a pair anyway, so it's similar in terms of cost), but I think that's at nice as it can be! – Marc Plano-Lesay Apr 19 '20 at 00:20
3

I would still like to avoid mapping to an intermediary wrapper type, and as someone mentioned in the comments, the behaviour is slightly wrong (this emits an empty list at first if no arguments emitted anything yet), but this is slightly nicer than the solutions I had in mind when I wrote the question (still really similar) and works with nullable types:

inline fun <reified T> instantCombine(
  flows: Iterable<Flow<T>>
): Flow<List<T>> = combine(flows.map { flow ->
  flow.map {
    @Suppress("USELESS_CAST") // Required for onStart(null)
    Holder(it) as Holder<T>?
  }
    .onStart { emit(null) }
}) {
  it.filterNotNull()
    .map { holder -> holder.value }
}

And here's a test suite that passes with this implementation:

class InstantCombineTest {
  @Test
  fun `when no flows are merged, nothing is emitted`() = runBlockingTest {
    assertThat(instantCombine(emptyList<Flow<String>>()).toList())
      .isEmpty()
  }

  @Test
  fun `intermediate steps are emitted`() = runBlockingTest {
    val a = flow {
      delay(20)
      repeat(3) {
        emit("a$it")
        delay(100)
      }
    }
    val b = flow {
      repeat(3) {
        delay(150)
        emit("b$it")
      }
    }
    val c = flow {
      delay(400)
      emit("c")
    }

    assertThat(instantCombine(a, b, c).toList())
      .containsExactly(
        emptyList<String>(),
        listOf("a0"),
        listOf("a1"),
        listOf("a1", "b0"),
        listOf("a2", "b0"),
        listOf("a2", "b1"),
        listOf("a2", "b1", "c"),
        listOf("a2", "b2", "c")
      )
      .inOrder()
  }

  @Test
  fun `a single flow is mirrored`() = runBlockingTest {
    val a = flow {
      delay(20)
      repeat(3) {
        emit("a$it")
        delay(100)
      }
    }

    assertThat(instantCombine(a).toList())
      .containsExactly(
        emptyList<String>(),
        listOf("a0"),
        listOf("a1"),
        listOf("a2")
      )
      .inOrder()
  }

  @Test
  fun `null values are kept`() = runBlockingTest {
    val a = flow {
      emit("a")
      emit(null)
      emit("b")
    }

    assertThat(instantCombine(a).toList())
      .containsExactly(
        emptyList<String?>(),
        listOf("a"),
        listOf(null),
        listOf("b")
      )
      .inOrder()
  }
}
Marc Plano-Lesay
  • 6,808
  • 10
  • 44
  • 75
2

I think you might be looking for .merge():

fun <T> Iterable<Flow<T>>.merge(): Flow<T>
fun <T> merge(vararg flows: Flow<T>): Flow<T> 

Merges the given flows into a single flow without preserving an order of elements. All flows are merged concurrently, without limit on the number of simultaneously collected flows.

The default .merge() implementation works like this

public fun <T> Iterable<Flow<T>>.merge(): Flow<T> =
  channelFlow {
    forEach { flow ->
      launch {
        flow.collect { send(it) }
      }
    }
  }

https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/merge.html

Matt Klein
  • 7,856
  • 6
  • 45
  • 46
  • `.merge()` sadly doesn't do what I'm after: it combines every `Flow` you give it in a single `Flow`, whereas what I'm after is a `Flow>`, with each input `Flow` mapping its most recent value to an index in the list. – Marc Plano-Lesay Dec 10 '22 at 00:30