See this answer for "Unit test the new Kotlin coroutine StateFlow"
and this issue in Kotlin coroutines GitHub repo..
How can I test all emissions of this StateFlow (results
variable)?
class Emitter(dispatcher: CoroutineContext) {
private val coroutineScope = CoroutineScope(dispatcher)
private val source = MutableStateFlow("INITIAL")
private val _results = MutableStateFlow<String?>(null)
val results = _results.asStateFlow()
init {
source
.emitLatestEvery(5.seconds)
.conflate()
.map(String::lowercase)
.onEach(_results::emit)
.launchIn(coroutineScope)
}
@OptIn(ExperimentalCoroutinesApi::class)
private fun <T> Flow<T>.emitLatestEvery(duration: Duration) =
transformLatest {
while (true) {
emit(it)
delay(duration)
}
}
fun changeSource(s: String) {
source.value = s
}
}
Here is my test. It does not finish even if I use emitter.results
.take(3)
.toList(results)
:
class EmitterTest {
@OptIn(ExperimentalCoroutinesApi::class)
@Test fun `Sample test`() = runTest {
val dispatcher = UnconfinedTestDispatcher(testScheduler)
val emitter = Emitter(dispatcher)
val results = mutableListOf<String?>()
val job = launch(dispatcher) { emitter.results.toList(results) }
emitter.changeSource("a")
emitter.changeSource("aB")
emitter.changeSource("AbC")
Assertions.assertThat(results).isEqualTo(listOf(null, "initial", "a", "ab", "abc"))
job.cancel()
}
}