9

I've been hung up about this topic lately. It seems AsyncIterables and Observables both have stream-like qualities, though they are consumed a bit differently.

You could consume an async iterable like this

const myAsyncIterable = async function*() { yield 1; yield 2; yield 3; }

const main = async () => {
  for await (const number of myAsyncIterable()) {
    console.log(number)
  }
}

main()

You can consume an observable like this

const Observable = rxjs
const { map } = rxjs.operators

Observable.of(1, 2, 3).subscribe(x => console.log(x))
<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>

My overarching question is based off of this RxJS pr

If the observable emits at a pace faster than the loop completes, there will be a memory build up as the buffer gets more full. We could provide other methods that use different strategies (e.g. just the most recent value, etc), but leave this as the default. Note that the loop itself may have several awaits in it, that exacerbate the problem.

It seems to me that async iterators inherently do not have a backpressure problem, so is it right to implement Symbol.asyncIterator (@@asyncIterator) on an Observable and default to a backpressure strategy? Is there even a need for Observables in light of AsyncIterables?

Ideally, you could show me practical differences between AsyncIterables and Observables with code examples.

richytong
  • 2,387
  • 1
  • 10
  • 21
  • What do you mean by "*default to a backpressure strategy*"? – Bergi Jun 10 '20 at 20:08
  • You seem to already have answered the question from the title in the body (async iterables are pull-based, observables are push-based). Do you really want to know about the difference? – Bergi Jun 10 '20 at 20:10
  • @Bergi In particular, the backpressure strategy of allowing a memory build-up as the buffer gets more full (as the contributor seems to imply). And no, it does not sound like a good strategy to me. Also, I may have answered my question just a tad bit, but I was looking for something more fleshed out. – richytong Jun 10 '20 at 20:13
  • If it's not possible to consider Observables as pull sources, is it right to make Observables async iterable? I also wanted to point out for reference that Observables are in a [stage 1 proposal](https://tc39.es/proposal-observable/) – richytong Jun 10 '20 at 20:21
  • Can async iterables do everything observables can? Do we need an Observable in the spec? – richytong Jun 10 '20 at 20:24
  • Yes, an unbounded buffer is *no* backpressure, that's why I was confused by the wording. – Bergi Jun 10 '20 at 20:24
  • 1
    "*Do we need an Observable in the spec?*" - that's an opinion-based question we cannot answer here. The text of the proposal you linked should make good arguments for that though. – Bergi Jun 10 '20 at 20:33
  • "*Can async iterables do everything observables can?*" - no. They require a consumer to produce values. Hm, on the other hand the observables in the proposal you linked appear to behave the same. – Bergi Jun 10 '20 at 20:38
  • 1
    Very close (iterator vs iterable) duplicate: [What is the difference between async generators and Observables?](https://stackoverflow.com/q/48512319/1048572) – Bergi Jun 16 '20 at 21:37
  • I wish I had found that question, thank you bergi – richytong Jun 16 '20 at 22:54

3 Answers3

10

The main difference is which side decides when to iterate.

In the case of Async Iterators the client decides by calling await iterator.next(). The source decides when to resolve the promise, but the client has to ask for the next value first. Thus, the consumer "pulls" the data in from the source.

Observables register a callback function which is called by the observable immediately when a new value comes in. Thus, the source "pushes" to the consumer.

An Observable could easily be used to consume an Async Iterator by using a Subject and mapping it to the next value of the async iterator. You would then call next on the Subject whenever you're ready to consume the next item. Here is a code sample

const pull = new Subject();
const output = pull.pipe(
  concatMap(() => from(iter.next())),
  map(val => { 
    if(val.done) pull.complete();
    return val.value;
  })
);
//wherever you need this 
output.pipe(

).subscribe(() => {
  //we're ready for the next item
  if(!pull.closed) pull.next();
});

ranger83753992
  • 116
  • 1
  • 3
0

This is the current implementation Observable[Symbol.asyncIterator].

Here's a basic example of Symbol.asyncIterator implemented on an array:

const dummyPromise = (val, time) => new Promise(res => setTimeout(res, time * 1000, val));

const items = [1, 2, 3];

items[Symbol.asyncIterator] = async function * () {
  yield * await this.map(v => dummyPromise(v, v));
}

!(async () => {
  for await (const value of items) {

  console.log(value);
}
})();
/* 
1 - after 1s
2 - after 2s
3 - after 3s
*/

The way I understand generators(sync generators) is that they are pausable functions, meaning that you can request a value right now and another value 10 seconds later. The async generators follow the same approach, except that the value they produce is asynchronous, which means that you'll have to await for it.

For instance:

const dummyPromise = (val, time) => new Promise(res => setTimeout(res, time * 1000, val));

const items = [1, 2, 3];
items[Symbol.asyncIterator] = async function * () {
  yield * await this.map(v => dummyPromise(v, v));
}

const it = items[Symbol.asyncIterator]();

(async () => {
  // console.log(await it.next())
  await it.next();

  setTimeout(async () => {
    console.log(await it.next());
  }, 2000); // It will take 4s in total
})();

Going back to the Observable's implementation:

async function* coroutine<T>(source: Observable<T>) {
  const deferreds: Deferred<IteratorResult<T>>[] = [];
  const values: T[] = [];
  let hasError = false;
  let error: any = null;
  let completed = false;

  const subs = source.subscribe({
    next: value => {
      if (deferreds.length > 0) {
        deferreds.shift()!.resolve({ value, done: false });
      } else {
        values.push(value);
      }
    },
    error: err => { /* ... */ },
    complete: () => { /* ... */ },
  });

  try {
    while (true) {
      if (values.length > 0) {
        yield values.shift();
      } else if (completed) {
        return;
      } else if (hasError) {
        throw error;
      } else {
        const d = new Deferred<IteratorResult<T>>();
        deferreds.push(d);
        const result = await d.promise;
        if (result.done) {
          return;
        } else {
          yield result.value;
        }
      }
    }
  } catch (err) {
    throw err;
  } finally {
    subs.unsubscribe();
  }
}

From my understanding:

  • values is used to keep track of synchronous values If you have of(1, 2, 3), the values array will contain [1, 2, 3] before it even reached while(true) { }. And because you're using a for await (const v of ...), you'd be requesting values as if you were doing it.next(); it.next(); it.next() ....

    Put differently, as soon as you can consume one value from your iterator, you're immediately requesting for the next one, until the data producer has nothing to offer.

  • deferreds is used for asynchronous values so at your first it.next() , the values array is empty(meaning that the observable did not emit synchronously), so it will fall back to the last else, which simply creates a promise that is added to deferreds, after which that promise is awaited until it either resolves or rejects.

    When the observable finally emits, deferreds won't be empty, so the awaited promise will resolve with the newly arrived value.

const src$ = merge(
  timer(1000).pipe(mapTo(1)),
  timer(2000).pipe(mapTo(2)),
  timer(3000).pipe(mapTo(3)),
);

!(async () => {
  for await (const value of src$) {
    console.log(value);
  }
})();

StackBlitz

Andrei Gătej
  • 11,116
  • 1
  • 14
  • 31
  • When already working with a queue of promises yourself, it's much easier (and more efficient) to [implement the async-iterator interface yourself](https://stackoverflow.com/a/47157577/1048572) than using an async generator function. – Bergi Jun 10 '20 at 21:38
  • Interesting! Your answer reminds me of dataloader. Thank you. I personally find this approach easier to grasp, although it may not be the most efficient – Andrei Gătej Jun 10 '20 at 22:01
0

The observable stuff is mind-bending, and my understanding could be flawed. But an async iterator is just an iterator that returns promises, which can resolve to future events in a "live" stream of events (a hot observable). It could be implemented using a queue as follows.

function* iterateClickEvents(target) {
  const queue = []
  target.addEventListener('click', e => queue.shift()?.fulfill(e))
  while (true)
    yield new Promise(fulfill => queue.push({fulfill}))
}

//use it
for await (const e of iterateClickEvents(myButton))
  handleEvent(e)

Then you can implement fluent operators like:

class FluentIterable {
  constructor(iterable) {
    this.iterable = iterable
  }
  filter(predicate) {
    return new FluentIterable(this.$filter(predicate))
  }
  async* $filter(predicate) {
    for await (const value of this.iterable)
      if (predicate(value))
        yield value
  }
  async each(fn) {
    for await (const value of this.iterable)
      fn(value)
  }
}

//use it
new FluentIterable(iterateClickEvents(document.body))
  .filter(e => e.target == myButton)
  .each(handleEvent)
  .catch(console.error)

https://codepen.io/ken107/pen/PojZjgB

You could implement a map operator that returns the results of inner iterators. Things get complicated from there.

Sarsaparilla
  • 6,300
  • 1
  • 32
  • 21