0

I have a state$ stream that constains messages$s which is array of messages$ stream. State$ is updated and new messages$ appear.

I want subscriber to handle messages from all messages$ in one sinle stream an I want this this stream contain only correct events.

I try to flatMap merged messages$ every time, but got the problem that old messages$s (which where in previous states$ values) are subsribed multiple timed.

How do I solve this?

let allMessages$ = state$.flatMap(s => { return Observable.merge(s.messages$s) } ) allMessages$.subscribe((x)=>{ console.log('message', x) // message from single message$ appear multiple times })

The problem is that after state$ is updated (with items pushed) old one became to be subscribed multiple times. state$ --s(1)---------s(2)---- message$s[0]. --m1----m2-----------m4-- message$s[1] ---------------m3-------- allMessages$ --m1----m2-----m3----m4 m1 m4

s(1) - when state has 1 message$, s(2) when second message$ is added So allMessages$ fire with messages from item1.

What i want is: state$ --s(1)---------s(2)----- message$s[0] --m1----m2-----------m4-- message$s[1] ---------------m3-------- allMessages$ --m1----m2-----m3----m4

This fildle shows the situation simplified: http://jsfiddle.net/8jFJH/797/

user3743222
  • 18,345
  • 5
  • 69
  • 75
WHITECOLOR
  • 24,996
  • 37
  • 121
  • 181
  • hard to understand what you say. Best is to give an example of input, expected output, and actual output, and explain the discrepancy (as if you would write a test for your function basically). My best bet is that you have to use `flatMapLatest` instead of `flatMap` but hard to say without understanding the specifications of your function. – user3743222 Jan 14 '16 at 21:30
  • Ok tried to draw. The problem is that resulting sream recieves multiple events from from the same streams. – WHITECOLOR Jan 14 '16 at 21:39
  • Actually I managed to do this adding `distinct()` not sure if its correct solution as I'm new to RxJs. – WHITECOLOR Jan 14 '16 at 21:40
  • does that mean your problem is solved? – user3743222 Jan 14 '16 at 21:41
  • I would like to get the confirmation that it is correct solution, or get another one. – WHITECOLOR Jan 14 '16 at 21:43
  • `distinct` allow you to not repeat twice the same value in your stream. So if by any chance, m4 = m1 you won't see that m4 in your stream. So I guess `distinct` is not what you need. Your `message$` source is hot or cold? – user3743222 Jan 14 '16 at 21:46
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/100710/discussion-between-whitecolor-and-user3743222). – WHITECOLOR Jan 14 '16 at 21:56

1 Answers1

2

Based on your simplified situation, these are the subscriptions sequences (you can review the answer here for explanation of hot vs. cold observables, and understanding of subscription flows):

  • emission of state1
    • subscription to typing$
  • emission of state2
    • subscription to typing$
    • subscription to typing2$

Because you use flatMap, you have three subscriptions at the same time. If you use flatMapLatest here is what happens :

  • emission of state1
    • subscription to typing$
  • emission of state2
    • 'unsubscription' (is that even english language) from previous stream emitted in flatMapLatest i.e. Rx.Observable.merge(state.items) i.e. typing$
    • subscription to typing$
    • subscription to typing2$

So try replacing flatMap by flatMapLatest and let me know if that solves the problem.

Another way to solve this could also to work with a stream of state changes instead of with the whole state (kind of what redux does for react).

Community
  • 1
  • 1
user3743222
  • 18,345
  • 5
  • 69
  • 75
  • Yes it solves) thank you very much. As Well have to add share() to startWith. – WHITECOLOR Jan 15 '16 at 00:11
  • Redux way is not functional actually. But if based on state changes, how to remove existing subscrptions (when item is removed from state)? – WHITECOLOR Jan 15 '16 at 00:13
  • yes, that's why the 'kind of'. You're right, removing might complicate too much in this case. Basically to remove you need to terminate a given stream. You can do that by appending a `takeUntil` somewhere, but then you have to manage a stream terminator for each stream. So if there is no performance penalty, it could be better to do the current way. – user3743222 Jan 15 '16 at 00:21