3

I have some pre-defined events set to occur at specific times. And I have a timer, like this:

const timer = Rx.Observable.interval(100).timeInterval()
    .map(x => x.interval)
    .scan((ms, total) => total + ms, 0)

The timer emits something close to 100,200,300,400,500 (although in reality it's more like 101,200,302,401,500...which is totally fine) I also have some stuff I want to do at certain times. For example, let's say I want to do stuff at the following times:

const stuff = Rx.Observable.from([1000, 2000, 2250, 3000, 5000]);

What I'd like is to combine "stuff" and "timer" in such a way that the resulting stream emits a value once per time defined in "stuff" at that time (or ever so slightly later). in this case, that would be t=1000 ms, 2000 ms, 2250 ms, 3000 ms and 5000 ms. Note: the 2250 guy should emit around time 2300 because of the interval size. that's fine. they just can't come early or more than once.

I have one solution, but it's not very good. it re-starts "stuff" every single step (every single 100 ms in this case) and filters it and takes 1. I would prefer that, once an event is emitted from "stuff", that it be gone, so subsequent filters on it don't have those values.

In the real application, there will be stuff and stuff2 and maybe stuff3...(but I will call them something else!)

Thanks in advance! I hope that was clear.

mookie the swede
  • 427
  • 6
  • 13

2 Answers2

6

If I've understood what you're after correctly, this should be achievable with a simple projection:

const times$ = stuff.flatMap(x => Rx.Observable.timer(x));

Here's a working sample: https://jsbin.com/negiyizibu/edit?html,js,console,output

Edit

For the second requirement, try something like this:

const times$ = Rx.Observable
                 .from([{"val":"jeff", "t": 1000}, {"val":"fred", "t": 2500}])
                 .flatMap(x => Rx.Observable.timer(x.t).map(y => x.val));

https://jsbin.com/cegijudoci/edit?js,console,output

Matt Burnell
  • 2,646
  • 1
  • 22
  • 23
  • it isn't quite what i want...the array of stuff isn't delays between them, but when they should occur. i could always change that though. lemme see... one thing i'm wondering about though is, stuff won't just be an array of numbers, it'll be something like [ {"t": 1000, "val": "xyz"} ], and really, what i'm after is "xyz" being emitted at time 1000. lemme play with this though... – mookie the swede Jun 11 '16 at 06:52
  • const stuff = Rx.Observable.from([{"val":"jeff", "t": 1000}, {"val":"fred", "t": 2500}]); stuff .flatMap(x => Rx.Observable.timer(x.t)) .subscribe(x => console.log(x)); // it just spits out "0". i would like "jeff" at 1000ms, "fred" at 2500ms...not 2500 ms after jeff, but 1500 – mookie the swede Jun 11 '16 at 06:58
  • The solution I offered will emit at the specified times. If you run the sample I included, you'll note that the time between the second and third elements is very brief (250ms, not 2250ms). Or am I still misunderstanding? – Matt Burnell Jun 11 '16 at 10:31
  • yes, it works, thanks! now i just have to wrap my head around why it works. it seems like for every item in the array it would create a new timer observable and that would wait "t" amount, and so time would be time between items, but it's not...it's the way i want...i just don't know why. ugh, this stuff is confusing. – mookie the swede Jun 11 '16 at 22:22
  • You're pretty close. It does indeed create a new timer observable for each element in the source observable (which is created from the array), and then flattens them into a single observable. Because they're all relative to the starting point, not each other, it produces the desired result. When I first started out with Rx, the thing I had the most difficulty with was observables of observables (and the associated operators like flatMap). I'd recommend spending some time getting comfortable with this area of Rx. – Matt Burnell Jun 12 '16 at 04:02
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/114445/discussion-between-user2073082-and-matt-burnell). – mookie the swede Jun 12 '16 at 05:19
0

Here's a typescript function I wrote based on Matt's solution.

import {from, timer} from 'rxjs';
import {flatMap, map} from 'rxjs/operators';

export interface ActionQueueEntry {
    action: string;
    payload?: any;
    delay: number;
}

export function actionQueue(entries: ActionQueueEntry[]) {
    return from(entries).pipe(flatMap((x: any) => {
        return timer(x.delay).pipe(map(y => x));
    }));
}

const q = actionQueue([
  {action: 'say: hi', delay: 500},
  {action: 'ask: how you are', delay: 2500},
  {action: 'say: im fine', delay: 5000},
]);
q.subscribe(console.log);
ForrestLyman
  • 1,544
  • 2
  • 17
  • 24