4

Monix use Ack to sync emitted messages, but if I use groupBy and flatMap, the inner Observable not follow the back-pressure out of the source.

See this test code please:

import java.util.concurrent.TimeUnit

import monix.execution.Scheduler.Implicits.global
import monix.execution.Ack.Continue
import monix.reactive.{Observable, OverflowStrategy}
import org.junit.Test


class MonixBackpressureWithGroupByTest2 {
  @Test
  def test(): Unit = {
    val source = Observable.range(0,130)

    val backPressuredStream = source.map(x => {
        println("simple log first  map - " + x)
        x
      })
      .asyncBoundary(OverflowStrategy.BackPressure(5))
      .map { i =>

        println("after backpressure map, and Rim 3 operation of source - " + ((i % 3) toString) -> i)
        ((i % 3) toString) -> i
      }
      .groupBy{case (k, v) => k}
      .flatMap(x => {
        val mapWithSleep = x.map{case groupedMsg@(key, value) =>
          Thread.sleep(2000)
          println("inner Observable after group by rim 3. sleep 2 second for every message - " + groupedMsg)
          groupedMsg
        }

        mapWithSleep

      })

    backPressuredStream.share.subscribe(
      (keyAndValue: (String, Long)) => Continue
    )

    global.scheduleWithFixedDelay(0L, 1000L, TimeUnit.MILLISECONDS, () => {
      println("========sleep 1 second ============")
    })

    Thread.currentThread().join()

  }

}

output:

...

========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,72)
(after backpressure map, and Rim 3 operation of source - 1,73)
(after backpressure map, and Rim 3 operation of source - 2,74)
(after backpressure map, and Rim 3 operation of source - 0,75)
========sleep 1 second ============
========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,75)
(after backpressure map, and Rim 3 operation of source - 1,76)
(after backpressure map, and Rim 3 operation of source - 2,77)
(after backpressure map, and Rim 3 operation of source - 0,78)
========sleep 1 second ============
========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,78)
(after backpressure map, and Rim 3 operation of source - 1,79)
...

Which appear some backpressure not match well:
after: sleep 2 second for every message ... backpressure give three of item after backpressure map - ...

how can sleep 2 second for every message ... have one to one relation with after backpressure map - ... in terms of back-pressure?

and another wonder: why log of sleep 2 second for every message output (0, 72), (0, 75), (0,78) but such things (0, 72), (1, 73), (2,74)?

thanks.

Monix version: "io.monix" %% "monix" % "3.0.0-RC1"

LoranceChen
  • 2,453
  • 2
  • 22
  • 48
  • Grammatical errors in your questions make it hard to understand what you're asking. As we are all humans and noone expects you to speak (or write) perfect English, can you at least try to explain it with more detail? – Markus Appel Apr 25 '19 at 11:16
  • Maybe also instead of writing `thread -`, `2222 - ` and `=====`, make your log messages a little more descriptive so we can understand what you're trying to show. – Markus Appel Apr 25 '19 at 11:17
  • @MarkusAppel, hi, I have do some pretty print info and variable name. Even so, sorry for English is also poor. – LoranceChen Apr 26 '19 at 01:52

1 Answers1

2

The behaviour you see is exactly what you can expect.

To quickly summarize what your application does, let me explain it with my words:


You have an Observable generate numbers, and do some side-effect for each element.

Next, you group the elements by _ % 3.

Next, you do some more side-effects (sleeping and writing to console) inside of each group's Observable.

Then, you flatMap each group's Observable, resulting in a single, flat Observable.


So why do you in the beginning only see the first group (where _ % 3 == 0) print stuff to the console? ***

The answer lies in flatMap: When looking at the documentation for Observable, you will find the following description for flatMap:

final def flatMap[B](f: (A) ⇒ Observable[B]): Observable[B]

Alias for concatMap.

[...]

Think of Observables like you would think about Lists for a second: When you concat Lists, you will end up with a single List containing first the elements of the first List, followed by the elements of the second List, and so on.

In Monix, the same behaviour is achieved for Observable by waiting for the first Observable produced inside a flatMap (read: concatMap) operation to send the "completed" - signal. Only then will the second Observable be consumed, and so on.

Or, simply put, flatMap cares about the sequence of the produced Observables.

But when do the Observables in your flatMap operation "complete"? For this we have to understand how groupBy works - because that's where they come from.

For groupBy to work although Observables are lazily evaluated it has to store incoming elements in a buffer. I am not 100% sure on this, but if groupBy works like I think it does, it will, for any grouped Observable that pulls the next element, go through the original Observable indefinitely until it finds an element belonging to that very group, saving all prior (but not yet required) elements belonging to other groups in that buffer for later use.

All of this means that groupBy cannot know if all elements of a group have been found until the source Observable signals completion, then it will use all remaining buffered elements and then signal completion to the grouped Observables.

In simpler words: Observables produced by groupBy do not complete until the source Observable completes.

When pulling all this information together you will understand that only when the source Observable (your Observable.range(0, 130)) has been completed, the first grouped Observable will be completed as well, and because of flatMap only then all other grouped Observables will be used.

Because I know from your last question that you're trying to build a web socket, using flatMap is a bad idea - your source Observable of incoming requests will never complete, effectively only serving the very first IP address you will come across.

What you have to do instead is use mergeMap. When comparing to concatMap mergeMap does not care about the sequence of elements, instead the "first come first served" - rule applies.


*** : When you reach the end of my explanation and hopefully understand how groupBy and flatMap works, you will understand why I wrote "in the beginning"!

Markus Appel
  • 3,138
  • 1
  • 17
  • 46
  • Thanks for your patient! The problem is occurred, like what you said, when building a socket library. Hold in [github](https://github.com/LoranceChen/RxSocket).`mergeMap` is good choice for every socket as one Observable of the grouped event. And another thinking about groupBy is how to unsubscribe disconnected socket avoid buffer useless Observable if remote address as group key? – LoranceChen Apr 27 '19 at 10:56
  • @LoranceChen I think that very much depends how you wrap the socket into an `Observable` in the first place. But if you merge the grouped `Observable`s you won't be able to unsubscribe to single groups - because there is only one `Subscriber` for everything. But you might not have to unsubscribe afterall - it won't bring any real improvement. Still, in your Socket -> `Observable` - implementation you could probably send a completed signal once the socket closes. – Markus Appel Apr 29 '19 at 08:08