I have a List<Flow<T>>
, and would like to generate a Flow<List<T>>
. This is almost what combine
does - except that combine waits for each and every Flow
to emit an initial value, which is not what I want. Take this code for example:
val a = flow {
repeat(3) {
emit("a$it")
delay(100)
}
}
val b = flow {
repeat(3) {
delay(150)
emit("b$it")
}
}
val c = flow {
delay(400)
emit("c")
}
val flows = listOf(a, b, c)
runBlocking {
combine(flows) {
it.toList()
}.collect { println(it) }
}
With combine
(and hence as-is), this is the output:
[a2, b1, c]
[a2, b2, c]
Whereas I'm interested in all the intermediary steps too. This is what I want from those three flows:
[]
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]
Right now I have two work-arounds, but none of them are great... The first one is plain ugly and doesn't work with nullable types:
val flows = listOf(a, b, c).map {
flow {
emit(null)
it.collect { emit(it) }
}
}
runBlocking {
combine(flows) {
it.filterNotNull()
}.collect { println(it) }
}
By forcing all the flows to emit a first, irrelevant value, the combine
transformer is indeed called, and lets me remove the null values which I know are not actual values. Iterating on that, more readable but heavier:
sealed class FlowValueHolder {
object None : FlowValueHolder()
data class Some<T>(val value: T) : FlowValueHolder()
}
val flows = listOf(a, b, c).map {
flow {
emit(FlowValueHolder.None)
it.collect { emit(FlowValueHolder.Some(it)) }
}
}
runBlocking {
combine(flows) {
it.filterIsInstance(FlowValueHolder.Some::class.java)
.map { it.value }
}.collect { println(it) }
}
Now this one works just fine, but still feels like I'm overdoing stuff. Is there a method that I'm missing in the coroutines library?