140

Question

For testing purposes, I'm creating Observable objects that replace the observable that would be returned by an actual http call with Http.

My observable is created with the following code:

fakeObservable = Observable.create(obs => {
  obs.next([1, 2, 3]);
  obs.complete();
});

The thing is, this observable emits immediatly. Is there a way to add a custom delay to its emission?


Track

I tried this:

fakeObservable = Observable.create(obs => {
  setTimeout(() => {
    obs.next([1, 2, 3]);
    obs.complete();
  }, 100);
});

But it doesn't seem to work.

Adrien Brunelat
  • 4,492
  • 4
  • 29
  • 42

6 Answers6

198

Using the following imports:

import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/delay';

Try this:

let fakeResponse = [1,2,3];
let delayedObservable = Observable.of(fakeResponse).delay(5000);
delayedObservable.subscribe(data => console.log(data));

UPDATE: RXJS 6

The above solution doesn't really work anymore in newer versions of RXJS (and of angular for example).

So the scenario is that I have an array of items to check with an API with. The API only accepts a single item, and I do not want to kill the API by sending all requests at once. So I need a timed release of items on the Observable stream with a small delay in between.

Use the following imports:

import { from, of } from 'rxjs';
import { delay } from 'rxjs/internal/operators';
import { concatMap } from 'rxjs/internal/operators';

Then use the following code:

const myArray = [1,2,3,4];

from(myArray).pipe(
        concatMap( item => of(item).pipe ( delay( 1000 ) ))
    ).subscribe ( timedItem => {
        console.log(timedItem)
    });

It basically creates a new 'delayed' Observable for every item in your array. There are probably many other ways of doing it, but this worked fine for me, and complies with the 'new' RXJS format.

MikeOne
  • 4,789
  • 2
  • 26
  • 23
  • 2
    Property 'of' does not exist on type 'typeof Observable'. Do you import your Observable with `import {Observable} from 'rxjs/Observable';`? – Adrien Brunelat Feb 23 '17 at 16:59
  • Yep, I use .of extensively so I guess something strange is up with your rx dependencies.. – MikeOne Feb 24 '17 at 21:20
  • I mean you confirm you're not importing with `import {Observable} from 'rxjs'`? If yes, I have some investigation to do... – Adrien Brunelat Feb 24 '17 at 21:29
  • I am importing as import {Observable} from 'rxjs/Observable' – MikeOne Feb 25 '17 at 11:39
  • Did you have any success? – MikeOne Feb 25 '17 at 19:03
  • 1
    From this page: https://www.npmjs.com/package/rxjs. I deduced I had to explicitly import with `import 'rxjs/add/observable/of';`. Do you happen to do the same thing? It's still odd though, since it won't chain with .delay(...) and it shows an error when I try `rxjs/add/observable/delay`... – Adrien Brunelat Feb 27 '17 at 09:24
  • I managed to make it work in a non-testing environment with the import: `import 'rxjs/add/operator/delay'` which adds clarity to the problem. The thing is, in a testing environment, jasmine is not waiting for the delay when calling `fixture.detectChanges()` – Adrien Brunelat Feb 27 '17 at 12:19
  • I have never had those issues I'm afraid, so I can't help you with that.. – MikeOne Feb 27 '17 at 12:23
  • Sure, the testing stuff is kind of off-topic here, but I should be able to handle it now that the way rxjs is imported is clearer for me. – Adrien Brunelat Feb 27 '17 at 12:23
  • Understood. I approved your edits for others that have the same issues. – MikeOne Feb 27 '17 at 12:25
  • What about an observable which throws an error with delay? – Lazar Ljubenović Oct 01 '17 at 13:25
  • 4
    should `of(item.pipe ( delay( 1000 ) ))` be `of(item))).pipe(delay(1000)` trying to pipe the array gave me errors – Don Thomas Boyle Jul 26 '18 at 19:36
  • I believe the updated function is structured incorrectly. @AdrianBer formed the correct answer. – SILENT Aug 18 '18 at 22:32
  • 2
    This is what worked for me with rxjs6: from([1, 2, 3, 4, 5, 6, 7]).pipe(concatMap(num => of(num).pipe(delay(1000)))) .subscribe(x => console.log(x)); – robert Sep 20 '18 at 20:53
  • @robert same, `of(item).pipe( delay(1000) )` worked. `of(item.pipe...` is incorrect – ganjeii Feb 28 '19 at 20:01
  • 1
    @MikeOne 's solution worked for me too. Sad that so much code is necessary for such a simple matter... – Codev Jun 24 '19 at 17:40
162

In RxJS 5+ you can do it like this

import { Observable } from "rxjs/Observable";
import { of } from "rxjs/observable/of";
import { delay } from "rxjs/operators";

fakeObservable = of('dummy').pipe(delay(5000));

In RxJS 6+

import { of } from "rxjs";
import { delay } from "rxjs/operators";

fakeObservable = of('dummy').pipe(delay(5000));

If you want to delay each emitted value try

from([1, 2, 3]).pipe(concatMap(item => of(item).pipe(delay(1000))));
Adrian Ber
  • 20,474
  • 12
  • 67
  • 117
19

What you want is a timer:

// RxJS v6+
import { timer } from 'rxjs';

//emit [1, 2, 3] after 1 second.
const source = timer(1000).map(([1, 2, 3]);
//output: [1, 2, 3]
const subscribe = source.subscribe(val => console.log(val));
Pellet
  • 2,254
  • 1
  • 28
  • 20
  • 6
    Good answer, don't forget to unsubscribe – Sami Oct 24 '19 at 06:02
  • 1
    @Sami this specific instance of `timer` doesn't need to be unsubscribed (it will auto complete after 1 second). This is because `timer` isn't using the second argument of interval which would need to have extra code to mark it completed some how. – Brian Schermerhorn Jun 02 '22 at 18:47
8

It's little late to answer ... but just in case may be someone return to this question looking for an answer

'delay' is property(function) of an Observable

fakeObservable = Observable.create(obs => {
  obs.next([1, 2, 3]);
  obs.complete();
}).delay(3000);

This worked for me ...

microchip78
  • 2,040
  • 2
  • 28
  • 39
0

You can use asyncScheduler, it schedules tasks asynchronously, by putting them on the JavaScript event loop queue. It is best used to delay tasks in time or to schedule tasks repeating in intervals.

Link to the documentation

Jayampathy Wijesena
  • 1,670
  • 1
  • 18
  • 26
-2

import * as Rx from 'rxjs/Rx';

We should add the above import to make the blow code to work

Let obs = Rx.Observable
    .interval(1000).take(3);

obs.subscribe(value => console.log('Subscriber: ' + value));
rink.attendant.6
  • 44,500
  • 61
  • 101
  • 156