1

I receive some data chunk-wise and want to precess the data-points with a minimum temporal distance. (You could formulate it as throttleTime without data loss or the opposite of buffer). After a ton of trying and googling I came up with this solution:

dataChunkReceiver
  .pipe(
    switchMap(chunk => of(...chunk)), // spread chunk of data-points to distinct events
    zip(interval(minimumInterval)), // make sure events don't get executed more often than minimumInterval
    map(itemAndNumber => itemAndNumber[0]) // only forward the data-point
  )

The zip(interval(minimumInterval))-solution works fine with, let's say, mouse events but not in this combination with switchMap and chunks. It still works like a charm when chunks come more often than minimumInterval - but not if they are slower. I can't figure out why, maybe someone can enlighten me?

To see the problem try my Stackblitz an set chunkInterval(the interval in which new chunks of data arrive) to 2000 while the minimumInterval is 400. As you can see, all data-points from each chunk get processed at the same time. While when you set chunkInterval to 400, they come out every 500 milliseconds as expected. For 2000 as chunkInterval I would expect the data points at ms

2000 | 2500 | 3000 | 4000 | 4500 | 5000 | 6000 | 6500 | 7000

I am sure the must be a subtle misconception in my solution, please help to find it!

Heretic Monkey
  • 11,687
  • 7
  • 53
  • 122
Paflow
  • 2,030
  • 3
  • 30
  • 50
  • Strangely enough I didn't no stumble upon the thread containing the solution on my search, but the marked as duplicte thread does contain indeed the `2. ConcatMap to a timersolution which helped. Thanks a lot for the one finding it for me. I still do not know why my solutions fails, since it's problematic nature is mentioned but not explained int the thread - but anyway. – Paflow Aug 21 '20 at 13:27
  • 1
    the `zip` approach isn't working with `chunkInterval: 2000` & `minimumInterval: 400` because the `interval` has emitted three times by the time the first chunk of 3 items gets emitted. So `zip` can zip the 3 incoming chunk items with the 3 already present interval items without having to wait for additional interval items. You can use https://rxmarbles.com/#zip to help you understand when `zip` emits. Btw. `switchMap(chunk => of(...chunk))` can be replaced with `switchMap(chunk => chunk)` which can be replaced with `switchAll()` – frido Aug 21 '20 at 13:59

0 Answers0