8

I'm using RxViz to simulate different actions that comes every 1 sec. When I try

Rx.Observable.create(obs => {
  obs.next([1, 2, 3]); // or could be ['aaa', 'bbbb', 'ccc']
  obs.complete();
}).delay(1000);

on https://rxviz.com

or on my own with a console.log

it keeps displaying the three number 1, 2, 3 at the same time

There's a post about this same problem, but none of the answer works for me. I'm using Rx last version 6

How can I create an observable with a delay

[EDIT] The array can contains anything like number, string or any object

John
  • 4,351
  • 9
  • 41
  • 57

7 Answers7

15

If you want to delay each value (by 1 sec for example), you may do something like the following:

 Rx.Observable.create(obs => {
      obs.next([1, 2, 3]);
      obs.complete();
    })
      .pipe(
        // make observable to emit each element of the array (not the whole array)
        mergeMap((x: [any]) => from(x)),
        // delay each element by 1 sec
        concatMap(x => of(x).pipe(delay(1000)))
      )
      .subscribe(x => console.log(x));
  }

Here I did not modify the internals of the observable created by you. Instead, I just take your observable and apply appropriate operations to achieve what you seem to be expecting.

siva636
  • 16,109
  • 23
  • 97
  • 135
  • If think your solution is nice. Made me look for the `concatMap`. Once thing, `of` operator does not take array and deconstruct it. the `from` operator does. After that, you need to apply a second concatMap to do the conversion of each value in the array into an observable with the delay apply to it – elpddev Jun 16 '18 at 11:37
  • I tried on rxviz, it only display a Cross icon, I had to convert your code to rxJs 5: Rx.Observable.create(obs => { obs.next([1, 2, 3]); obs.complete(); }) .concatMap(x=>of(x) .delay(1000) ) – John Jun 16 '18 at 12:12
  • 1
    I also tried running it in local, and all the array display in one time instead of emitting each value after each second: Observable.create(obs => { obs.next([1, 2, 3]); obs.complete(); }).pipe( concatMap(x=>Observable.of(x)), delay(1000) ) .subscribe(a => console.log(a)) – John Jun 16 '18 at 12:13
  • @John Please check my updated answer with tested working code. – siva636 Jun 16 '18 at 13:59
  • Great solution, thank you! Simple and effective. I found some awful beasts before this :) – Eddie Jaoude Jul 25 '20 at 02:52
10

Here is my solution (very clean)

const fakeData = [1,2,3]

loadData$() {
    return from(fakeData).pipe(
      concatMap(item => of(item).pipe(
        delay(1000)
      )),
    );
  }
Kris Jobs
  • 698
  • 11
  • 12
4

This one works by modifying a little bit @siva636's answer

Rx.Observable.create(obs => { 
  obs.next(1); 
  obs.next(2); 
  obs.next(3); 
  obs.complete(); 
}.concatMap(x=>Rx.Observable.of(x) .delay(1000) )
John
  • 4,351
  • 9
  • 41
  • 57
3

Here is a succinct way that builds on the other responses.

from([...Array(10).keys()]).pipe(
    concatMap(x => of(x).pipe(delay(1000)))
).subscribe(y => console.log(y))

A more RxJs native version would be as follows.

const myInterval = rxjs.interval(1000);
myInterval.pipe(rxjs.operators.take(10)).subscribe(x => console.log(x));
2

Here, you emit in one observable emission the all array. [1,2,3]. You only delay that one emission by 1000 ms. But the emission is still one.

Even if we emit each value on its own, the delay function will only apply to the first emission. The others will come immediately after:

Rx.Observable.create(obs => {
  var arr = [1, 2, 3];
  arr.forEach(item => obs.next(item));
  obs.complete();
}).delay(1000);

There is no magic in the create constructing function. If we want an emission to come every x time:

We could make an interval that emits those values (taken from learnrxjs)

import { Observable } from 'rxjs/Observable';

/*
  Increment value every 1s, emit even numbers.
*/
const evenNumbers = Observable.create(function(observer) {
  let value = 0;
  const interval = setInterval(() => {
    observer.next(value);
    value++;
  }, 1000);

  return () => clearInterval(interval);
});
elpddev
  • 4,314
  • 4
  • 26
  • 47
  • setInterval is not what I'm looking for, the array in my example is an array of int, but it could be anything like string, or an object. If I use Observable.create, the solution could be something like in pseudo code: Observable.create(obs => { observer.next('aaaa'); observer.delay(1000); // does not work observer.next('bbbb'); observer.delay(1000); observer.next('cccc'); }); – John Jun 16 '18 at 12:16
1

RxJS v7 supports the operator delayWhen [1], so you could write a simpler code as

import { delayWhen, interval, of } from 'rxjs';

of("John", "von", "Neumman", "János Neumann").pipe(
  delayWhen((_, index) => interval(index*1000))
).subscribe(console.log);

Check out a demo on https://stackblitz.com/edit/vgibzv?file=index.ts

It works because it delays the emission of items by 0 seconds, 1000 seconds, 2000 seconds, 3000 seconds, and so on.

Another choice is the operator scan, you make the series from an interval [2].

[1] "RxJS - delayWhen." 16 Dec. 2022, https://rxjs.dev/api/operators/delayWhen

[2] "RxJS - scan." 16 Dec. 2022, rxjs.dev/api/index/function/scan

0

The good with rxjs is that it is easy to compose custom operators from existing ones. e.g. in your case the delayPerElement operator:

import { delay, concatMap, of, map } from 'rxjs';

//Compose a custom operator
const delayPerElement = (delayMs)=>concatMap(x => of(x).pipe(delay(delayMs)))

from([1,2,3,4,5])
.pipe(delayPerElement(1000))
.subscribe(x=>console.log(x))


output: (a number is printed every second)

1
2 
3 
4 
5 

You can go further and create a delayedMap

//Compose a custom operator
const delayedMap = (mapperFunc, delayMs)=>pipe(map(mapperFunc),delayPerElement(delayMs));


from([1,2,3,4,5])
.pipe(delayedMap(x=>x*2), 1000)
.subscribe(x=>console.log(x))

output: (a number is printed every second)

2
4 
5 
8 
10 

or even further with delayedConcatMap (e.g. useful for delay between web requests)

//Compose a custom operator
const delayedConcatMap = (...args)=>pipe(delayedMap(...args), concatAll());

from([1,2,3,4,5])
.pipe(delayedConcatMap(x=>axios.get(`https://jsonplaceholder.typicode.com/todos/${x}`).then(j=>j.data.title), 1000) )
.subscribe(x=>console.log(x))


output: (a text is printed every second)

delectus aut autem
quis ut nam facilis et officia qui
fugiat veniam minus
et porro tempora
laboriosam mollitia et enim quasi adipisci quia provident illum
Marinos An
  • 9,481
  • 6
  • 63
  • 96