This is the current implementation Observable[Symbol.asyncIterator]
.
Here's a basic example of Symbol.asyncIterator
implemented on an array:
const dummyPromise = (val, time) => new Promise(res => setTimeout(res, time * 1000, val));
const items = [1, 2, 3];
items[Symbol.asyncIterator] = async function * () {
yield * await this.map(v => dummyPromise(v, v));
}
!(async () => {
for await (const value of items) {
console.log(value);
}
})();
/*
1 - after 1s
2 - after 2s
3 - after 3s
*/
The way I understand generators(sync generators) is that they are pausable functions, meaning that you can request a value right now and another value 10 seconds later. The async generators follow the same approach, except that the value they produce is asynchronous, which means that you'll have to await
for it.
For instance:
const dummyPromise = (val, time) => new Promise(res => setTimeout(res, time * 1000, val));
const items = [1, 2, 3];
items[Symbol.asyncIterator] = async function * () {
yield * await this.map(v => dummyPromise(v, v));
}
const it = items[Symbol.asyncIterator]();
(async () => {
// console.log(await it.next())
await it.next();
setTimeout(async () => {
console.log(await it.next());
}, 2000); // It will take 4s in total
})();
Going back to the Observable
's implementation:
async function* coroutine<T>(source: Observable<T>) {
const deferreds: Deferred<IteratorResult<T>>[] = [];
const values: T[] = [];
let hasError = false;
let error: any = null;
let completed = false;
const subs = source.subscribe({
next: value => {
if (deferreds.length > 0) {
deferreds.shift()!.resolve({ value, done: false });
} else {
values.push(value);
}
},
error: err => { /* ... */ },
complete: () => { /* ... */ },
});
try {
while (true) {
if (values.length > 0) {
yield values.shift();
} else if (completed) {
return;
} else if (hasError) {
throw error;
} else {
const d = new Deferred<IteratorResult<T>>();
deferreds.push(d);
const result = await d.promise;
if (result.done) {
return;
} else {
yield result.value;
}
}
}
} catch (err) {
throw err;
} finally {
subs.unsubscribe();
}
}
From my understanding:
values
is used to keep track of synchronous values
If you have of(1, 2, 3)
, the values
array will contain [1, 2, 3]
before it even reached while(true) { }
. And because you're using a for await (const v of ...)
,
you'd be requesting values as if you were doing it.next(); it.next(); it.next() ...
.
Put differently, as soon as you can consume one value from your iterator, you're immediately requesting for the next one, until the data producer has nothing to offer.
deferreds
is used for asynchronous values
so at your first it.next()
, the values
array is empty(meaning that the observable did not emit synchronously), so it will fall back to the last else
, which simply creates a promise that is added to deferreds
, after which that promise is await
ed until it either resolves
or rejects
.
When the observable finally emits, deferreds
won't be empty, so the awaited promise will resolve
with the newly arrived value.
const src$ = merge(
timer(1000).pipe(mapTo(1)),
timer(2000).pipe(mapTo(2)),
timer(3000).pipe(mapTo(3)),
);
!(async () => {
for await (const value of src$) {
console.log(value);
}
})();
StackBlitz