1

I'm quite a beginner in RxJS, I'm currently using RxJS@5 and don't understand a behavior of my code

const currentExtentMinutes$ = initialExtentMinutes$
    .merge(selectedExtentMinutes$)
    .distinctUntilChanged()

// We message the worker that
// there is a new extent minutes
currentExtentMinutes$
  .subscribe(currentExtentMinutes => {
      console.log('send current extent', currentExtentMinutes);
      currentExtentMinutes => worker.postMessage({currentExtentMinutes});
  });

This works great, but as soon as I add this other piece of code, the first subscribe doesn't work anymore

sortedTeams$.withLatestFrom(currentExtentMinutes$)
  .subscribe(([teams, extent]) => {
      const d3line = line()
        .x((pt, i) => scaleMinutes.invert(extent[0]) + scaleMinutes.invert(i))
        .y(scaleRanking)
        .curve(curveCardinal.tension(.5));
      const lines = gGraph.selectAll('.team-path').data(teams, _.get('name'));
      lines.enter().append('path')
        .attr('class', 'team-path')
        .style('stroke', team => `rgb(${team.colors[0]})`)
        .style('stroke-width', 7)
        .style('stroke-linecap', 'round')
        .style('stroke-linejoin', 'round')
        .style('fill', 'none')
        .merge(lines)
        .transition(t)
        .attr('d', team => d3line(team.ranking));
  });

Am I doing something wrong ?

1 Answers1

1

I think you might be yet another victim of the hot vs. cold nature of observables. Basically currentExtentMinutes is subscribed twice, once in the first code snippet, and the second time with the use of withLatestFrom. Every subscription to a cold observable will restart the producer, producing values anew (for more details have a look here).

If that is the problem here, then it should be enough to 'share' your cold observable with

const currentExtentMinutes$ = initialExtentMinutes$
    .merge(selectedExtentMinutes$)
    .distinctUntilChanged()
    .share()
Community
  • 1
  • 1
user3743222
  • 18,345
  • 5
  • 69
  • 75