0

Can't get previous emitted values from Flow.

class TestActivity: ComponentActivity() {
...

private val flowA = MutableStateFlow(0)

private val flowB = MutableStateFlow("")

init {
    flowB.onEach { Log.d("flowtest", "test - $it") }
        .launchIn(lifecycleScope)
}

override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)
    flowB.value = "1"
    flowB.value = "2"
    flowB.value = "3"

    flowA.map { "ignore" }
        .onEach {
            flowB.value = "4"
            flowB.value = "5"
            flowB.value = "6"
        }
        .launchIn(lifecycleScope)

    flowA.value = 0
    
...
}

expect

test - 1
test - 2
test - 3
test - 4
test - 5
test - 6

result
test - 1
test - 2
test - 3
test - 6

What is missing point that concept of Flow?

How can I get previous emitted values?

Edric
  • 24,639
  • 13
  • 81
  • 91
  • Please update your question to include the real code you use. Code sample you provided can't even compile and even if it would, it shouldn't behave as you described. It should wait in `collect()` forever and never even get to the line where it emits `3`. – broot Feb 11 '23 at 08:36

2 Answers2

0

In fact, there is something wrong with this code because the value property is a stateFlow property, not a sharedFlow property. In addition, you can't add value to statedFlow after collecting data if the collector and suscriber in the same coroutine. So the right code will be:

val flowA = MutableSharedFlow<Int>()

val flowB = MutableStateFlow("")

flowA.map { "test $it" }
    .onEach { flowB.value = it }

flowB.value = "1"
flowB.value = "2"
flowB.value = "3"

flowB.collect { println(it) }

and the result will be: 3

because the stateFlow just keeps the last emitted one

if you want to get all values you can use SharedFlow like that:

suspend fun main():Unit = coroutineScope {

val flowA = MutableSharedFlow<Int>()

launch {
    flowA.collect {
        println(it)
    }
}

launch {
    flowA.emit(1)
    flowA.emit(2)
    flowA.emit(3)
}

And you can check the documentation to get more Info SharedFlow StateFlow

  • Sorry for giving too little information and wrong code. I change the code and description. could you check again, please? as you mentioned that **because the stateFlow just keeps the last emitted one** why 1, 2, 3 is emitted and 4, 5 is not emitted? – user2243604 Feb 11 '23 at 12:29
0

This is a bit of a guess since I'm not bothering to put together a project and test it.

First, there are two things to remember about StateFlow.

  1. It is limited to a history of 1. It can only replay a single value to new subscribers.
  2. It is conflated. This means that if a collector is slower than the emissions coming in, the collector will miss some of those emitted values and only get the latest one.

So, looking at your code, at first glance I would expect you to see only:

test - 3
test - 6

This is because you're emitting on the main thread and collecting on the main thread, so whenever you have multiple StateFlow value changes in a row, I would expect only the last one called in a method to "stick" since the collector is having to wait its turn for the main thread to be relinquished before it can collect its next value.

So why do 1 and 2 appear?

Well, actually, lifecycleScope doesn't use Dispatchers.Main. It uses Dispatchers.Main.immediate, which behaves a little differently. The immediate version runs suspending code immediately in place if you are already on the Main thread, instead of yielding to other coroutines first.

So, I'm theorizing that when you change the value on the main thread, but you are collecting flowB's onEach on Dispatchers.Main.immediate, so it gets a chance to immediately run its onEach right in place each time you emit a value to flowB.

But, a reminder, I haven't actually tested flows with immediate to test this hypothesis. It's just the only reason I can think of that would explain the behavior. To test this theory yourself, you can change to using launchIn(lifecycleScope + Dispatchers.Main) in both places and see if 1 and 2 disappear.

Tenfour04
  • 83,111
  • 11
  • 94
  • 154
  • Yeah, you right., I change the scope as you mentioned(lifecycleScope + Dispatchers.Main), `1` and `2` disappeared. key point is `Dispatchers.Main.immediate` has guarantee for **run suspending code immediately**. – user2243604 Feb 13 '23 at 06:15
  • So if I want to get all emitted values sequentially and immediately, I should use a mixed context that runs suspending code immediately. thanks. – user2243604 Feb 13 '23 at 06:27
  • Well, I wouldn’t rely on this. This example is contrived. In practice, you usually aren’t defining your flow in the same class you’re collecting it in. And due to separation of concerns and modularity in your design, you probably don’t want to subject the collecting class to a bunch of rules it needs to follow in order to successfully collect something. I would instead use a SharedFlow with an adequately sized buffer. – Tenfour04 Feb 13 '23 at 12:22