0

See this answer for "Unit test the new Kotlin coroutine StateFlow"
and this issue in Kotlin coroutines GitHub repo..

How can I test all emissions of this StateFlow (results variable)?

class Emitter(dispatcher: CoroutineContext) {
    private val coroutineScope = CoroutineScope(dispatcher)
    private val source = MutableStateFlow("INITIAL")
    private val _results = MutableStateFlow<String?>(null)
    val results = _results.asStateFlow()

    init {
        source
            .emitLatestEvery(5.seconds)
            .conflate()
            .map(String::lowercase)
            .onEach(_results::emit)
            .launchIn(coroutineScope)
    }

    @OptIn(ExperimentalCoroutinesApi::class)
    private fun <T> Flow<T>.emitLatestEvery(duration: Duration) =
        transformLatest {
            while (true) {
                emit(it)
                delay(duration)
            }
        }

    fun changeSource(s: String) {
        source.value = s
    }
}

Here is my test. It does not finish even if I use emitter.results.take(3).toList(results):

class EmitterTest {
    @OptIn(ExperimentalCoroutinesApi::class)
    @Test fun `Sample test`() = runTest {
        val dispatcher = UnconfinedTestDispatcher(testScheduler)
        val emitter = Emitter(dispatcher)
        val results = mutableListOf<String?>()
        val job = launch(dispatcher) { emitter.results.toList(results) }
        emitter.changeSource("a")
        emitter.changeSource("aB")
        emitter.changeSource("AbC")
        Assertions.assertThat(results).isEqualTo(listOf(null, "initial", "a", "ab", "abc"))
        job.cancel()
    }
}
Mahozad
  • 18,032
  • 13
  • 118
  • 133

0 Answers0