34

Recently, the class StateFlow was introduced as part of Kotlin coroutines.

I'm currently trying it and encountered an issue while trying to unit test my ViewModel. What I want to achieve: testing that my StateFlow is receiving all the state values in the correct order in my ViewModel.

My code is as follows.

ViewModel:

class WalletViewModel(private val getUserWallets: GetUersWallets) : ViewModel() {

val userWallet: StateFlow<State<UserWallets>> get() = _userWallets
private val _userWallets: MutableStateFlow<State<UserWallets>> =
        MutableStateFlow(State.Init)

fun getUserWallets() {
    viewModelScope.launch {
        getUserWallets.getUserWallets()
            .onStart { _userWallets.value = State.Loading }
            .collect { _userWallets.value = it }
    }
}

My test:

@Test
fun `observe user wallets ok`() = runBlockingTest {
    Mockito.`when`(api.getAssetWallets()).thenReturn(TestUtils.getAssetsWalletResponseOk())
    Mockito.`when`(api.getFiatWallets()).thenReturn(TestUtils.getFiatWalletResponseOk())

    viewModel.getUserWallets()
        
    val res = arrayListOf<State<UserWallets>>()
    viewModel.userWallet.toList(res) //doesn't works

    Assertions.assertThat(viewModel.userWallet.value is State.Success).isTrue() //works, last value enmited
}

Accessing the last value emitted works. But what I want to test is that all the emitted values are emitted in the correct order.

With this piece of code: viewModel.userWallet.toList(res) I'm getting the following error:

java.lang.IllegalStateException: This job has not completed yet
    at kotlinx.coroutines.JobSupport.getCompletionExceptionOrNull(JobSupport.kt:1189)
    at kotlinx.coroutines.test.TestBuildersKt.runBlockingTest(TestBuilders.kt:53)
    at kotlinx.coroutines.test.TestBuildersKt.runBlockingTest$default(TestBuilders.kt:45)
    at WalletViewModelTest.observe user wallets ok(WalletViewModelTest.kt:52)
....

I guess I'm missing something obvious. But not sure why as I'm just getting started with coroutines and Flow and this error seems to happen when not using runBlockingTest, which I use already.

EDIT:

As a temporary solution, I'm testing it as a live data:

@Captor
lateinit var captor: ArgumentCaptor<State<UserWallets>>
    
@Mock
lateinit var walletsObserver: Observer<State<UserWallets>>

@Test
fun `observe user wallets ok`() = runBlockingTest {
    viewModel.userWallet.asLiveData().observeForever(walletsObserver)
    
    viewModel.getUserWallets()
    captor.run {
        Mockito.verify(walletsObserver, Mockito.times(3)).onChanged(capture())
        Assertions.assertThat(allValues[0] is State.Init).isTrue()
        Assertions.assertThat(allValues[1] is State.Loading).isTrue()
        Assertions.assertThat(allValues[2] is State.Success).isTrue()
    }
}
Mahozad
  • 18,032
  • 13
  • 118
  • 133
agonist_
  • 4,890
  • 6
  • 32
  • 55
  • What about before calling ```viewModel.getUserWallets()``` you place this assert: ```Assertions.assertThat(viewModel.userWallet.value is State.Init).isTrue() ``` and skip this part: ```val res = arrayListOf>() viewModel.userWallet.toList(res) ``` – executioner May 31 '20 at 12:53
  • But im also struggling with testing StateFlow correctly, I tried using ```collectIndexed``` but i had the same error. Maybe its because this flow doesn't stop at the end of the ```runBlockingTest ``` block, i didn't find any solution how to cancel the StateFlow to be able to finish the job. I guess the right way to test it is to always test the value only after emitting some changes. Make sure you emit these on the Dispatchers.Main – executioner May 31 '20 at 12:57
  • @executioner I Updated my question with a temporary solution. Which works for now – agonist_ Jun 01 '20 at 01:35
  • Hey, i found a solution: viewModel.userWallet.take(NUMBER_OF_EXPECTED_VALUES).collect { list.add(it) } Then you can test the list values, just like the liveData solution. – executioner Jun 11 '20 at 07:49
  • @executioner great! Still with that I got the "This job has not completed yet" are you using it with runBlockingTest? – agonist_ Jun 18 '20 at 03:53
  • This works viewModel.countries .take(3) .onEach { res.add(it) } .launchIn(this) – agonist_ Jun 18 '20 at 06:07

8 Answers8

23

It seems that the Android team changed the API and documentation after this thread. You can check it here: Continuous collection

SharedFlow/StateFlow is a hot flow, and, as described in the docs, A shared flow is called hot because its active instance exists independently of the presence of collectors. It means the scope that launches the collection of your flow won't complete by itself.

To solve this issue, you need to cancel the scope in which collect is called, and as the scope of your test is the test itself, its not ok to cancel the test, so what you need is to launch it in a different job.

@Test
fun `Testing a integer state flow`() = runTest {
    val _intSharedFlow = MutableStateFlow(0)
    val intSharedFlow = _intSharedFlow.asStateFlow()
    val testResults = mutableListOf<Int>()

    val job = launch(UnconfinedTestDispatcher(testScheduler)) {
        intSharedFlow.toList(testResults)
    }
    _intSharedFlow.value = 5

    assertEquals(2, testResults.size)
    assertEquals(0, testResults.first())
    assertEquals(5, testResults.last())
    job.cancel()
}

Improved case: TestScope.backgroundScope ensures that the coroutine gets cancelled before the end of the test.

@Test
fun `Testing an integer state flow`() = runTest {
    val _intSharedFlow = MutableStateFlow(0)
    val intSharedFlow = _intSharedFlow.asStateFlow()
    val testResults = mutableListOf<Int>()

    backgroundScope.launch(UnconfinedTestDispatcher(testScheduler)) {
        intSharedFlow.toList(testResults)
    }
    _intSharedFlow.value = 5

    assertEquals(2, testResults.size)
    assertEquals(0, testResults.first())
    assertEquals(5, testResults.last())
}

A few important things:

  1. Always cancel your created job to avoid java.lang.IllegalStateException: This job has not completed yet
  2. As this is a StateFlow, when you start collecting (inside toList), you receive the last state. But if you first start collecting and then call your function viewModel.getUserWallets(), the result list will then have all the states, in case you want to test it too.
  3. The runTest API changed a little bit, and we need to use UnconfinedTestDispatcher(testScheduler) in the context of the launch call. The documentation says: Notice how UnconfinedTestDispatcher is used for the collecting coroutine here. This ensures that the collecting coroutine is launched eagerly and is ready to receive values after launch returns.
Marie
  • 194
  • 6
Javier Antón
  • 570
  • 4
  • 8
  • 1
    This is a good answer to assert the final list of collected values. Instead, if you need to test the behaviour, e.g. assert that the order of events an values is correct, e.g. event(s) 1 -> value(s) 1 -> event(s) 2 -> value(s) 2, etc., then you need to either break up your testable units into smaller ones (which might be cumbersome), or write more test boilerplate to be able to test all this. A testing library like https://github.com/cashapp/turbine can help to interleave events and (no) value/error/completion assertions. – Erik Jan 27 '21 at 10:04
  • 1
    You are right, but at the end you have all the results ordered. so you can check that every value is received in it's order. – Javier Antón Jan 27 '21 at 18:30
  • 1
    which is the import for this `StateFlow.toList`? cannot find it anywhere – desgraci Apr 19 '22 at 10:15
  • Your first example is not working for me. The first assertion fails: `expected:<2> but was:<0>`. – JonZarate Jun 08 '23 at 13:58
  • @JonZarate This was because you were using the Kotlin1.7 or higher and they changed the API, I've updated the answer (see point 3) – Javier Antón Jun 13 '23 at 10:38
9

Another way which I derived from this solution in Kotlin coroutines GitHub repository:

@Test fun `The StateFlow should emit all expected values`() = runTest {
    val dispatcher = UnconfinedTestDispatcher(testScheduler)
    val viewModel = MyViewModel(dispatcher)
    val results = mutableListOf<Int>()
    val job = launch(dispatcher) { viewModel.numbers.toList(results) }

    viewModel.addNumber(5)
    viewModel.addNumber(8)
    runCurrent() // Important

    assertThat(results).isEqualTo(listOf(0, 5, 8))
    job.cancel() // Important
}

And this is my ViewModel class:

class MyViewModel(private val dispatcher: CoroutineDispatcher) : ViewModel() {

    private val _numbers = MutableStateFlow(0)
    val numbers: StateFlow<Int> = _numbers

    fun addNumber(number: Int) {
        viewModelScope.launch(dispatcher) {
            _numbers.value = number
        }
    }
}

Please note that I'm using Kotlin 1.6.10 and kotlinx.coroutines-test 1.6.1:

testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.6.1")

Also, see the official Kotlin coroutines migration guide to the new test API.

Mahozad
  • 18,032
  • 13
  • 118
  • 133
  • Does your way work when you make multiple updates inside the viewModelScope: _numbers.value = number1, _numbers.value = number2 ? For me, it skips the first update. – Rowan Gontier Mar 29 '23 at 05:30
  • Tried this solution, As "Rowan" mentioned , first value emit is ignored, how to fix it ? – Sam Aug 10 '23 at 06:06
3

runBlockingTest just skips the delays in your case but not override the dispatcher used in the ViewModel with your test dispatcher. You need to inject TestCoroutineDispatcher to your ViewModel or since you are using viewModelScope.launch {} which already uses Dispatchers.Main by default, you need to override the main dispatcher via Dispatchers.setMain(testCoroutineDispatcher). You can create and add the following rule to your test file.

class MainCoroutineRule(
        val testDispatcher: TestCoroutineDispatcher = TestCoroutineDispatcher()
) : TestWatcher() {

    override fun starting(description: Description?) {
        super.starting(description)
        Dispatchers.setMain(testDispatcher)
    }

    override fun finished(description: Description?) {
        super.finished(description)
        Dispatchers.resetMain()
        testDispatcher.cleanupTestCoroutines()
    }
} 

And in your test file

@get:Rule
var mainCoroutineRule = MainCoroutineRule()

@Test
fun `observe user wallets ok`() = mainCoroutineRule.testDispatcher.runBlockingTest {
}

Btw it is always a good practice to inject dispatchers. For instance if you would have been using a dispatcher other than Dispatchers.Main in your coroutine scope like viewModelScope.launch(Dispatchers.Default), then your test will fail again even if you are using a test dispatcher. The reason is you can only override main dispatcher with Dispatchers.setMain() as it can be understood from its name but not Dispatchers.IO or Dispatchers.Default. In that case you need to inject mainCoroutineRule.testDispatcher to your view model and use the injected dispatcher rather than hardcoding it.

Fatih
  • 1,304
  • 1
  • 11
  • 10
2

We can create a coroutine for given and one coroutine for whenever

after whenever code, we can use yield so our given code would be done and ready to assert!

enter image description here

to do that you need to extend CouroutinScope as you can see :

enter image description here

done!

  • you can use emit instead of tryEmit
Vahab Ghadiri
  • 2,016
  • 20
  • 26
1

The issue you are facing is because toList() needs the flow to complete and "State flow never completes" as per the documentation.

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-state-flow/

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-list.html

  • This is indeed the correct answer to why OP sees "This job has not completed yet" error. So the solution would be to just collect the flow in a separate coroutine and in the collect function add values to a mutable list. And yet, this could still lead to some events not being included, because as documentation states, if collector is slow, events are conflated and intermediate values are skipped. – Lukas1 Jan 04 '22 at 07:21
1

This is what I'm using (without the need to customize the VM dispatcher):

...

@get:Rule
val coroutineRule = MainCoroutineRule()
...

@Test
fun `blablabla`() = runTest {
    val event = mutableListOf<SealedCustomEvent>()
    viewModel.screenEvent
        .onEach { event.add(it) }
        .launchIn(CoroutineScope(UnconfinedTestDispatcher(testScheduler)))
    
    viewModel.onCtaClick()
    advanceUntilIdle()

    Assertions.assertThat(event.last()).isInstanceOf(SealedCustomEvent.OnCtaClick::class.java)

    ...more checks
}

Using launchIn and advanceUntilIdle might solve your testing issues.

Daniele
  • 1,030
  • 9
  • 20
  • This is the only solution to catch all the updates on StateFlow that worked for me. You helped me a lot. Thank you so much. I was using a similar way with .launchIn(this). But changing the scope of the launchIn like you solved the problem. – UmutTekin Jul 11 '23 at 22:00
0

Using this with some minor improvement https://github.com/Kotlin/kotlinx.coroutines/issues/3143#issuecomment-1097428912

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.TestCoroutineScheduler
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import org.junit.Assert.assertEquals

@OptIn(ExperimentalCoroutinesApi::class)
/**
 * Test observer for Flow to be able to capture and verify all states.
 */
class TestObserver<T>(
  scope: CoroutineScope,
  testScheduler: TestCoroutineScheduler,
  flow: Flow<T>
) {
  private val values = mutableListOf<T>()

  private val job: Job = scope.launch(UnconfinedTestDispatcher(testScheduler)) {
    flow.collect { values.add(it) }
  }

  /**
   * Assert no values
   */
  fun assertNoValues(): TestObserver<T> {
    assertEquals(emptyList<T>(), this.values)
    return this
  }

  /**
   * Assert the values. Important [TestObserver.finish] needs to be called at the end of the test.
   */
  fun assertValues(vararg values: T): TestObserver<T> {
    assertEquals(values.toList(), this.values)
    return this
  }

  /**
   * Assert the values and finish. Convenient to avoid having to call finish if done last in the test.
   */
  fun assertValuesAndFinish(vararg values: T): TestObserver<T> {
    assertEquals(values.toList(), this.values)
    finish()
    return this
  }

  /**
   * Finish the job
   */
  fun finish() {
    job.cancel()
  }
}

@OptIn(ExperimentalCoroutinesApi::class)
/**
 * Test function for the [TestObserver]
 */
fun <T> Flow<T>.test(
  scope: TestScope
): TestObserver<T> {
  return TestObserver(scope, scope.testScheduler, this)
}

I can now do the following in my test

@Test
fun `test some states`() = runTest {
  val viewModel = ViewModel(
    repository = repository
  )
  val observer = viewModel.state.test(this)
  advanceUntilIdle()
  verify(repository).getData()
  observer.assertValuesAndFinish(
    defaultState,
    defaultState.copy(isLoading = true),
    defaultState.copy(title = "Some title")
  )
}

And my ViewModel

@HiltViewModel
internal class ViewModel @Inject constructor(
  private val repository: Repository
) : ViewModel() {

  private val _state = MutableStateFlow(State())
  val state: StateFlow<State> = _state

  init {
    fetch()
  }

  private fun fetch() {
    _state.value = state.value.copy(
      isLoading = true
    )
    val someData = repository.getData()
    _state.value = state.value.copy(
      isLoading = false,
      title = someData.title
    )
  }
}
latsson
  • 630
  • 7
  • 8
0

This also might help, let's try to mock a full scenario:

// On your VM
class MyViewModel(
    ...
) : ViewModel() {
    val listItems = combine(...) {
        ...
    }.stateIn(
        scope = viewModelScope, 
        started = SharingStarted.WhileSubscribed(), 
        initialValue = null,
    )
}
// On the test file
class MyViewModelTest {
    private val testCoroutineDispatcher = StandardTestDispatcher(TestCoroutineScheduler())

    @BeforeEach
    fun setUp() {
        // in case you're initializing your VM here just make sure
        // to setMain before VM initialization
        Dispatchers.setMain(testCoroutineDispatcher)
    }

    @AfterEach
    fun tearDown() {
        Dispatchers.resetMain()
    }

    @Test
    fun `given X When Y Then Z`() = runTest {
        // Given
        ...

        // When
        // Attach your observer
        val observer = viewModel.listItems.attachFlowObserver(scope = this)
        viewModel.fetchData()
        advanceUntilIdle() // this is needed!

        // Then
        // If you're interested in all values check:
        // observer.getEmittedValues()
        // Do your assertions here
        val results = observer.getLastValue()
        assertEquals(...)

        // Finish the observer
        observer.finish()
    }
}

I hope this is helpful.

Yasser AKBBACH
  • 539
  • 5
  • 7