32

I'm trying to emit simple array values one after another with 500ms in between:

var a = Rx.Observable.from([1,2,3]);
a.interval(500).subscribe(function(b) { console.log(b); });

However, this throws an exception:

Uncaught TypeError: a.interval is not a function.
Bryce
  • 6,440
  • 8
  • 40
  • 64
  • 1
    more succinct answers here for RxJS5 .. http://stackoverflow.com/questions/41225446/rxjs5-emit-array-items-over-time-and-repeat-forever – danday74 Dec 19 '16 at 16:16

9 Answers9

41

Three ways to do it, with RxJS version 6 :

1. Using concatMap

import { from, of, pipe } from 'rxjs';
import { concatMap, delay } from 'rxjs/operators';

const array = [1, 2, 3, 4, 5];

from(array)
 .pipe(
   concatMap(val => of(val).pipe(delay(1000))),
 )
 .subscribe(console.log);

2. Using zip and interval

import { from, pipe, interval } from 'rxjs';
import { delay, zip} from 'rxjs/operators';

const array = [1, 2, 3, 4, 5];

from(array)
 .pipe(
   zip(interval(1000), (a, b) => a),
 )
 .subscribe(console.log);

3. Using interval as source

import { interval, pipe } from 'rxjs';
import { map, take } from 'rxjs/operators';

const array = [1, 2, 3, 4, 5];

interval(1000)
.pipe(
  take(array.length),
  map(i => array[i])
)
.subscribe(console.log);
Michel
  • 26,600
  • 6
  • 64
  • 69
  • 5
    Beware! `zip` operator is deprecated in favor of static zip! Source: https://rxjs-dev.firebaseapp.com/api/operators/zip So it becomes: ```import { zip } from 'rxjs'; zip(interval(1000), from(array)).map((a, b) => a).subscribe(...)``` – Luckylooke Jul 13 '18 at 06:05
  • 1
    Thanks for all the 3 solutions. The first is the most simplest and straightforward. – varad_s Oct 10 '22 at 17:15
18

As already pointed out by xgrommx, interval is not an instance member of an observable but rather a static member of Rx.Observable.

Rx.Observable.fromArray([1,2,3]).zip(
  Rx.Observable.interval(500), function(a, b) { return a; })
.subscribe(
  function(x) { document.write(x + '<br \>'); },  
  null,  
  function() { document.write("complete"); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.min.js"></script>
Oliver Weichhold
  • 10,259
  • 5
  • 45
  • 87
10

This is how I would do it:

var fruits = ['apple', 'orange', 'banana', 'apple'];
var observable = Rx.Observable.interval(1000).take(fruits.length).map(t => fruits[t]);
observable.subscribe(t => {
  console.log(t);
  document.body.appendChild(document.createTextNode(t + ', '));
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.min.js"></script>
Michael Kang
  • 52,003
  • 16
  • 103
  • 135
  • 1
    interesting but this would not satisfy the general case of wanting to output items coming from an event stream at fixed interval of time (which OP seems to want based on example) – Oliver Nov 16 '17 at 16:33
1
var arrayList = [1,2,3,4,5];

var source = Rx.Observable
      .interval(500/* ms */)
      .timeInterval()
      .take(arrayList.length);

source.subscribe(function(idx){
    console.log(arrayList[idx]);
   //or document.write or whatever needed
});
silverfighter
  • 6,762
  • 10
  • 46
  • 73
  • 1
    interesting but this would not satisfy the general case of wanting to output items coming from one event stream at fixed interval of time – Oliver Nov 16 '17 at 16:31
1

Pretty late but a simpler solution would be :

const arr = ["Hi,", "how", "may", "I", "help", "you?"];

Rx.Observable.interval(500)
             .takeWhile(_ => _ < arr.length)
             .map(_ => arr[_])
             .subscribe(_ => console.log(_))    
Ishan Soni
  • 273
  • 3
  • 13
1

I find Weichhold technique to be the best but that it would gain in clarity of intent by extracting the zipped value outside of the zip:

// assume some input stream of values:
var inputs = Obs.of(1.2, 2.3, 3.4, 4.5, 5.6, 6.7, 7.8);
// emit each value from stream at a given interval:
var events = Obs.zip(inputs, Obs.interval(1000))
    .map(val => val[0])
    .forEach(console.log);
Oliver
  • 27,510
  • 9
  • 72
  • 103
1

If you want to release samples over time, you can do something like this

const observable = interval(100).pipe(
  scan((acc, value) => [value, ...acc], []),
  sampleTime(10000),
  map((acc) => acc[0])
);
Gabriel Machado
  • 401
  • 3
  • 14
0

I had a little different requirement, my array kept updating over time too. So basically I had to implement a queue which I can dequeue at a regular interval, but I didn't want to use an Interval.

If somebody has a need for something like this then probably this solution can help:

I have a function createQueue() that takes the array as an input and returns an Observable which we subscribe for listening to events from the Array at a regular interval. The function also modifies the 'push()' method of the passes array so that whenever any item is pushed in the array, the Observable would emit.

createQueue(queue: string[]) {
  return Observable.create((obs: Observer<void>) => {
    const arrayPush = queue.push;
    queue.push = (data: string) => {
      const returnVal = arrayPush.call(queue, data);
      obs.next();
      return returnVal;
    }
  }).pipe(switchMap(() => {
    return from([...queue])
      .pipe(
        concatMap(val => of(val)
        .pipe(delay(1000)))
      );
  }), tap(_ => queue.shift()))
}

Lets say that the array is: taskQueue = [];

So, we need to pass it to the above function and subscribe to it.

  createQueue(taskQueue).subscribe((data) => {
    console.log('Data from queue => ', data);
  });

Now, every time we do taskQueue.push('<something here>'), the subscription will trigger after a delay of "1000ms".

Please note: we should not be assigning a new array to the taskQueue after createQueue() has been called, or else we will loose the modified push().

Here is a dummy example for the above implementation: Test Example

Ashish Ranjan
  • 12,760
  • 5
  • 27
  • 51
-3

Rx.Observable instance doesn't have interval method http://xgrommx.github.io/rx-book/content/core_objects/observable/observable_instance_methods/index.html. You can use like this.

Rx.Observable.interval(500)
             .map(function(v) { return [1,2,3];})
             .subscribe(console.log.bind(console));
Paul Rooney
  • 20,879
  • 9
  • 40
  • 61
xgrommx
  • 461
  • 3
  • 15