30

Suppose I have some async iterable objects like this:

const a = {
  [Symbol.asyncIterator]: async function * () {
    yield 'a';
    await sleep(1000);
    yield 'b';
    await sleep(2000);
    yield 'c';
  }, 
};

const b = {
  [Symbol.asyncIterator]: async function * () {
    await sleep(6000);
    yield 'i';
    yield 'j';
    await sleep(2000);
    yield 'k';
  }, 
};

const c = {
  [Symbol.asyncIterator]: async function * () {
    yield 'x';
    await sleep(2000);
    yield 'y';
    await sleep(8000);
    yield 'z';
    await sleep(10000);
    throw new Error('You have gone too far! ');
  }, 
};

And for completeness:

// Promisified sleep function
const sleep = ms => new Promise((resolve, reject) => {
  setTimeout(() => resolve(ms), ms);
});

Now, suppose I can concat them like this:

const abcs = async function * () {
  yield * a;
  yield * b;
  yield * c;
};

The (first 9) items yielded will be:

(async () => {
  const limit = 9;
  let i = 0; 
  const xs = [];
  for await (const x of abcs()) {
    xs.push(x);
    i++;
    if (i === limit) {
      break;
    }
  }
  console.log(xs);
})().catch(error => console.error(error));

// [ 'a', 'b', 'c', 'i', 'j', 'k', 'x', 'y', 'z' ]

But imagine that I do not care about the order, that a, b and c yield at different speeds, and that I want to yield as quickly as possible.

How can I rewrite this loop so that xs are yielded as soon as possible, ignoring order?


It is also possible that a, b or c are infinite sequences, so the solution must not require all elements to be buffered into an array.

sdgfsdh
  • 33,689
  • 26
  • 132
  • 245
  • You can't write a loop at all. For doing things non-sequentially, you will need to dive down into the primitives. – Bergi May 29 '18 at 13:31
  • 1
    Apart from fixing your current code to make it runnable (like @T.J.Crowder suggested), could you please also provide an example where `a`, `b` and `c` actually run at different speeds so that we can observe the expected result? – Bergi May 29 '18 at 13:38
  • @Bergi I have added `sleep`s so that they take different amounts of time – sdgfsdh May 29 '18 at 13:46
  • I keep getting `Undefined is not a function` when I try to run your code. Shouldn't `[asyncIterator]` return an object with a `next()` method? (genuinely asking, never used it before, but it is how sync iterators work) – Máté Safranka May 29 '18 at 13:50
  • Next time, please make your example runnable (I linked this earlier, that polite comment is now gone for some reason: https://meta.stackoverflow.com/questions/358992/), because A) It helps you avoid posting incorrect code as was the case twice with this question, and B) It makes it easy for people to prove that their solutions do or don't work (to themselves before posting, to others afterward). Happy coding! – T.J. Crowder May 29 '18 at 15:09
  • @T.J.Crowder I was unable to post working code to SO because my browser does not support this syntax. I can only execute it using `babel-node` locally. A topic for meta, perhaps? – sdgfsdh May 29 '18 at 15:12
  • @sdgfsdh - Ouch. Probably worth saying that next time, so we can suggest things. If you can't install the latest Chrome or Firefox, both of which support this, you can use Babel's [REPL](http://babeljs.io/repl) to make sure it works, and then copy it to a snippet once you know it's working. HTH. – T.J. Crowder May 29 '18 at 15:19
  • Perhaps you may like to check this [discussion](https://codereview.stackexchange.com/a/260387/105433) – Redu May 05 '21 at 18:16
  • The cleanest solution I've found on npm is here: https://github.com/tungv/async-generator/blob/master/packages/merge/index.js – polkovnikov.ph Dec 16 '21 at 00:21

7 Answers7

14

There is no way to write this with a loop statement. async/await code always executes sequentially, to do things concurrently you need to use promise combinators directly. For plain promises, there's Promise.all, for async iterators there is nothing (yet) so we need to write it on our own:

async function* combine(iterable) {
    const asyncIterators = Array.from(iterable, o => o[Symbol.asyncIterator]());
    const results = [];
    let count = asyncIterators.length;
    const never = new Promise(() => {});
    function getNext(asyncIterator, index) {
        return asyncIterator.next().then(result => ({
            index,
            result,
        }));
    }
    const nextPromises = asyncIterators.map(getNext);
    try {
        while (count) {
            const {index, result} = await Promise.race(nextPromises);
            if (result.done) {
                nextPromises[index] = never;
                results[index] = result.value;
                count--;
            } else {
                nextPromises[index] = getNext(asyncIterators[index], index);
                yield result.value;
            }
        }
    } finally {
        for (const [index, iterator] of asyncIterators.entries())
            if (nextPromises[index] != never && iterator.return != null)
                iterator.return();
        // no await here - see https://github.com/tc39/proposal-async-iteration/issues/126
    }
    return results;
}

Notice that combine does not support passing values into next or cancellation through .throw or .return.

You can call it like

(async () => {
  for await (const x of combine([a, b, c])) {
    console.log(x);
  }
})().catch(console.error);
Bergi
  • 630,263
  • 148
  • 957
  • 1,375
  • Does the `return results` actually do anything here? – Patrick Roberts Jun 03 '18 at 04:46
  • 1
    @PatrickRoberts Yes, it collects the final `return` values of the involved generator functions similar to how `Promise.all` does it. Not that it's used often, but you could observe it with `console.log(yield* combine(…))` in an `async function*`. – Bergi Jun 03 '18 at 13:25
  • It took me a moment to realize what you meant. The last return value of `.next()` ends up being `{ value: [], done: true }` even though it's not consumed by the `for await...of` loop, right? – Patrick Roberts Jun 03 '18 at 15:09
  • there is some little mistake in your implantation, beside that, after they fixed it work – pery mimon Oct 06 '19 at 14:20
  • @perymimon Would you mind telling us what that mistake is, or suggest an [edit] for how to fix it (if it's really just minor like a typo)? – Bergi Oct 06 '19 at 15:18
  • @perymimon Sorry, that doesn't look like a minor update to me. Also it seems [your version](https://stackoverflow.com/revisions/50586391/3) broke working with async iterables, instead it worked with a normal iterable containing promises. – Bergi Oct 07 '19 at 09:55
  • ok. revert if you like. it work for `asyncIterator = async function* (){}` on chrome 77 ( the last one). i see that there is different `Symbol` for `asyncIterator` and just `iterator` maybe that is the clue – pery mimon Oct 07 '19 at 10:29
  • @perymimon I just tried out (again) with the `a`,`b`,`c` code from the OP, it works as expected. Notice that the parameter is an array (generalised: iterable - the conversion happens with `Array.from`) of async iterables – Bergi Oct 07 '19 at 11:48
  • and what the types of `a` `b` & `c`? – pery mimon Oct 07 '19 at 14:35
  • @perymimon `a`, `b` and `c` are the async iterables [defined in the OP](https://stackoverflow.com/q/50585456/1048572) – Bergi Oct 07 '19 at 14:44
  • sure. I mean what is the exactly the constructor that create them. – pery mimon Oct 08 '19 at 11:15
  • @perymimon They're plain object literals with a `Symbol.asyncIterator` method. One could alternatively have created them by calling an async generator function. There is no specific constructor, but they implement the [AsyncIterable interface](http://ecma-international.org/ecma-262/#sec-asynciterable-interface). – Bergi Oct 08 '19 at 11:57
  • I don’t think this answer solves the problem. You’re calling `Promise.race` on calls to `iterator.next` from each iterator, which will cause values to be dropped. I wrote a more complete answer which preserves values from each iterator. – brainkim Nov 11 '19 at 23:06
  • @brainkim Try it out and re-read the code. None of the values are dropped. The `Promise.race` finds the iterator that emits a value first, and then advances only that iterator, racing it against the same old promises from the other iterators again and again. – Bergi Nov 11 '19 at 23:18
  • @Bergi Ooooh I see how it works. My bad. Yeah this works. – brainkim Nov 11 '19 at 23:22
  • 1
    The only thing I maybe should add is a `try`/`finally` clause that closes the non-finished iterators in case of an abrupt completion. – Bergi Nov 11 '19 at 23:35
  • @Bergi Yeah, you definitely should add a try/finally block to return all iterators on finish. – brainkim Nov 12 '19 at 03:58
  • One thing I’ve just discovered is that calling Promise.race repeatedly with a promise which never settles is a potential memory leak, and remembered this question. It seems like most if not all would suffer from the same leak. See https://github.com/nodejs/node/issues/17469 for more info. – brainkim Aug 31 '20 at 19:51
  • @brainkim Oh, they still didn't fix that? I was hoping that promises held to their reactions weakly, using the resolver functions as ephemerons, to fix the leak from never-resolving promises. I suppose we could fix this with `const never = {then(){}};`. However, we still got a problem if one async iterator yields barely ever and the other yields very often. To fix that, we'd need to avoid `Promise.race` altogether (or explicitly nullify a reference), or use [a promise library that implements callback cancellation](https://github.com/bergus/creed/blob/cancellation/src/Race.js). – Bergi Aug 31 '20 at 20:12
  • @Bergi I’m deep in the throes of trying to fix this issue, and I think avoiding `Promise.race` entirely is the best solution right now, though I’m not entirely sure yet how that works. You’re right that the same issue would appear if the skew between iterators increases. I am ruling out promise cancellation because I continue to want to use native promises. – brainkim Aug 31 '20 at 20:48
  • 1
    @brainkim Basically we'd have to purposefully deploy the [`Promise` constructor antipattern](https://stackoverflow.com/q/23803743/1048572) and deferred pattern. We wouldn't keep an array of `getNext()`-returned promises at all any more, but just install two handlers with mutable references to the currently racing resolvers: `let resolve, reject; for (const [index, asyncIterator] of asyncIterators.entries()) asyncIterator.next().then(result => { resolve({result, index}); }, err => { reject(err); });` … `const {index, result} = await new Promise((res, rej) => { resolve = res; reject = rej; });`. – Bergi Aug 31 '20 at 20:57
  • How to implement this without using `async/await`? I need it [here](https://github.com/vitaly-t/iter-ops/issues/61). – vitaly-t Dec 18 '21 at 10:56
  • 1
    @vitaly-t Simply transpile it :-) But really, the only `await` is in that `while` loop, so it's rather easy to convert that into a recursive approach – Bergi Dec 18 '21 at 12:15
  • I think something is missing here. When we call `Promise.race` with parameters, we need to track rejections of all the promises that we pass into it, because `combine` needs to reject on the next request then. Or maybe we do that and I just don't see it? – vitaly-t Dec 18 '21 at 12:25
  • 1
    @vitaly-t It does keep track of them - the promise is held in the `nextPromises`, and kept in there until it settles, even if promises from the other iterators fulfill earlier. Once one of the promise rejects, the iterator does throw that error, and closes. – Bergi Dec 18 '21 at 12:29
  • After many tries and fails, I ended up with a [completely my own version of it](https://github.com/vitaly-t/iter-ops/blob/main/src/ops/combine.ts#L103), because I needed one that worked with native promises only (no async/await, no generators), plus to handle a mixed list of `Iterable+AsyncIterable+Iterator+AsyncIterator`. – vitaly-t Dec 19 '21 at 06:01
  • [Saga continues here, in case you want to help](https://stackoverflow.com/questions/70410261/combine-asyncronous-iterables-via-native-promises). – vitaly-t Dec 19 '21 at 10:04
6

If I change abcs to accept the generators to process, I come up with this, see inline comments:

const abcs = async function * (...gens) {
  // Worker function to queue up the next result
  const queueNext = async (e) => {
    e.result = null; // Release previous one as soon as possible
    e.result = await e.it.next();
    return e;
  };
  // Map the generators to source objects in a map, get and start their
  // first iteration
  const sources = new Map(gens.map(gen => [
    gen,
    queueNext({
      key: gen,
      it:  gen[Symbol.asyncIterator]()
    })
  ]));
  // While we still have any sources, race the current promise of
  // the sources we have left
  while (sources.size) {
    const winner = await Promise.race(sources.values());
    // Completed the sequence?
    if (winner.result.done) {
      // Yes, drop it from sources
      sources.delete(winner.key);
    } else {
      // No, grab the value to yield and queue up the next
      // Then yield the value
      const {value} = winner.result;
      sources.set(winner.key, queueNext(winner));
      yield value;
    }
  }
};

Live Example:

// Promisified sleep function
const sleep = ms => new Promise((resolve, reject) => {
  setTimeout(() => resolve(ms), ms);
});

const a = {
  [Symbol.asyncIterator]: async function * () {
    yield 'a';
    await sleep(1000);
    yield 'b';
    await sleep(2000);
    yield 'c';
  }, 
};

const b = {
  [Symbol.asyncIterator]: async function * () {
    await sleep(6000);
    yield 'i';
    yield 'j';
    await sleep(2000);
    yield 'k';
  }, 
};

const c = {
  [Symbol.asyncIterator]: async function * () {
    yield 'x';
    await sleep(2000);
    yield 'y';
    await sleep(8000);
    yield 'z';
  }, 
};

const abcs = async function * (...gens) {
  // Worker function to queue up the next result
  const queueNext = async (e) => {
    e.result = null; // Release previous one as soon as possible
    e.result = await e.it.next();
    return e;
  };
  // Map the generators to source objects in a map, get and start their
  // first iteration
  const sources = new Map(gens.map(gen => [
    gen,
    queueNext({
      key: gen,
      it:  gen[Symbol.asyncIterator]()
    })
  ]));
  // While we still have any sources, race the current promise of
  // the sources we have left
  while (sources.size) {
    const winner = await Promise.race(sources.values());
    // Completed the sequence?
    if (winner.result.done) {
      // Yes, drop it from sources
      sources.delete(winner.key);
    } else {
      // No, grab the value to yield and queue up the next
      // Then yield the value
      const {value} = winner.result;
      sources.set(winner.key, queueNext(winner));
      yield value;
    }
  }
};

(async () => {
  console.log("start");
  for await (const x of abcs(a, b, c)) {
    console.log(x);
  }
  console.log("done");
})().catch(error => console.error(error));
.as-console-wrapper {
  max-height: 100% !important;
}
T.J. Crowder
  • 1,031,962
  • 187
  • 1,923
  • 1,875
  • 1
    Nice! I think you could simplify by having just a Map of promises, using `sources.set(winner.key, queueNext(winner))` instead of `winner.next =`. That way you wouldn't need the `map` in the `race` call, and without the `next` field also the `sources` initialisation becomes shorter – Bergi May 29 '18 at 15:23
  • @Bergi - You're right. By the time I finally got my head around this, I was *well* over the time budget I'd given myself for it. :-) So since it worked, I stopped. But...yup, you were right, just edited and it works a treat. – T.J. Crowder May 29 '18 at 15:34
  • @Bergi - LOL, good point, once I got rid of the `map`, I didn't need the array in `Promise.race` anymore. :-) I've incorporated some of your changes. I prefer to set `result` to `null` while the promise is pending, early release of the previous object... – T.J. Crowder May 29 '18 at 16:02
  • @T.J.Crowder Added my take on this. Would love to have my answer reviewed. – Ben Jun 03 '18 at 03:46
2

This is a complicated task, so I’m going to break it up into individual parts:

Step 1: logging each value from each async iterable to the console

Before we even think about creating an async iterator we should first consider the task of simply logging each value from each iterator to the console as they arrive. As with most concurrent tasks in javascript, this involves calling multiple async functions and awaiting their results with Promise.all.

function merge(iterables) {
  return Promise.all(
    Array.from(iterables).map(async (iter) => {
      for await (const value of iter) {
        console.log(value);
      }
    }),
  );
}

// a, b and c are the async iterables defined in the question
merge([a, b, c]); // a, x, b, y, c, i, j, k, z, Error: you have gone too far!

CodeSandbox link: https://codesandbox.io/s/tender-ives-4hijy?fontsize=14

The merge function logs values from each iterator, but is mostly useless; it returns a promise which fulfills to an array of undefined when all iterators finish.

Step 2: Replacing the merge function with a merge async generator

The next step is to replace console.log calls with calls to a function which pushes to a parent async iterator. To do this with an async generator, we need a little bit more code, because the only way to “push” a value onto an async generator is with the yield operator, which can’t be used in child function scopes. The solution is to create two queues, a push queue and a pull queue. Next, we define a push function which either pushes to the push queue if there are no pending pulls, or enqueues a value to be pulled later. Finally, we have to perpetually yield either values from the push queue if it has values, or promises which enqueue a resolve function to be called by push later. Here’s the code:

async function *merge(iterables) {
  // pushQueue and pullQueue will never both contain values at the same time.
  const pushQueue = [];
  const pullQueue = [];
  function push(value) {
    if (pullQueue.length) {
      pullQueue.pop()(value);
    } else {
      pushQueue.unshift(value);
    }
  }

  // the merge code from step 1
  const finishP = Promise.all(
    Array.from(iterables).map(async (iter) => {
      for await (const value of iter) {
        push(value);
      }
    }),
  );

  while (true) {
    if (pushQueue.length) {
      yield pushQueue.pop();
    } else {
      // important to note that yield in an async generator implicitly awaits promises.
      yield new Promise((resolve) => {
        pullQueue.unshift(resolve);
      });
    }
  }
}

// code from the question
(async () => {
  const limit = 9;
  let i = 0; 
  const xs = [];
  for await (const x of merge([a, b, c])) {
    xs.push(x);
    console.log(x);
    i++;
    if (i === limit) {
      break;
    }
  }
  console.log(xs); // ["a", "x", "b", "y", "c", "i", "j", "k", "z"]
})().catch(error => console.error(error));

CodeSandbox link: https://codesandbox.io/s/misty-cookies-du1eg

This almost works! If you run the code, you’ll notice that the xs is correctly printed, but the break statement is not respected, and values continue to be pulled from child iterators, causing the error thrown in c to be thrown, resulting in an unhandled promise rejection. Also note that we don’t do anything with the result of the Promise.all call. Ideally, when the finishP promise settles, the generator should be returned. We need just a little bit more code to make sure that 1. the child iterators are returned when the parent iterator is returned (with a break statement in a for await loop, for instance), and 2. the parent iterator is returned when all child iterators return.

Step 3: stopping each child iterator when the parent iterator is returned, and the parent iterator when every child has returned.

To make sure each child async iterable is correctly returned when the parent async generator is returned, we can use a finally block to listen for the completion of the parent async generator. And to make sure the parent generator is returned when the child iterators return, we can race yielded promises against the finishP promise.

async function *merge(iterables) {
  const pushQueue = [];
  const pullQueue = [];
  function push(value) {
    if (pullQueue.length) {
      pullQueue.pop()(value);
    } else {
      pushQueue.unshift(value);
    }
  }
  // we create a promise to race calls to iter.next
  let stop;
  const stopP = new Promise((resolve) => (stop = resolve));
  let finished = false;
  const finishP = Promise.all(
    Array.from(iterables).map(async (iter) => {
      // we use the iterator interface rather than the iterable interface
      iter = iter[Symbol.asyncIterator]();
      try {
        while (true) {
          // because we can’t race promises with for await, we have to call iter.next manually
          const result = await Promise.race([stopP, iter.next()]);
          if (!result || result.done) {
            return;
          }
          push(result.value);
        }
      } finally {
        // we should be a good citizen and return child iterators
        await iter.return && iter.return();
      }
    }),
  ).finally(() => (finished = true));

  try {
    while (!finished) {
      if (pushQueue.length) {
        yield pushQueue.pop();
      } else {
        const value = await Promise.race([
          new Promise((resolve) => {
            pullQueue.unshift(resolve);
          }),
          finishP,
        ]);
        if (!finished) {
          yield value;
        }
      }
    }

    // we await finishP to make the iterator catch any promise rejections
    await finishP;
  } finally {
    stop();
  }
}

CodeSandbox link: https://codesandbox.io/s/vigilant-leavitt-h247u

There are some things we still need to do before this code is production ready. For instance, values are pulled from the child iterators continuously, without waiting for the parent iterator to pull them. This, combined with the fact that pushQueue is an unbounded array, can cause memory leaks if the parent iterator pulls values at a slower pace than the child iterators produces them.

Additionally, the merge iterator returns undefined as its final value, but you might want the final value to be the final value from the last-completing child iterator.

If you’re looking for a small, focused library which has a merge function like the one above which covers some more use-cases and edge-cases, check out Repeater.js, which I wrote. It defines the static method Repeater.merge, which does what I described above. It also provides a clean API for turning callback-based APIs into promises and other combinator static methods to combine async iterators in other ways.

brainkim
  • 902
  • 3
  • 11
  • 20
1

In case anyone finds it useful, here's a typescript version of the currently accepted answer:


const combineAsyncIterables = async function* <T>(
  asyncIterables: AsyncIterable<T>[],
): AsyncGenerator<T> {
  const asyncIterators = Array.from(asyncIterables, (o) =>
    o[Symbol.asyncIterator](),
  );
  const results = [];
  let count = asyncIterators.length;
  const never: Promise<never> = new Promise(noOp);
  const getNext = (asyncIterator: AsyncIterator<T>, index: number) =>
    asyncIterator.next().then((result) => ({ index, result }));

  const nextPromises = asyncIterators.map(getNext);
  try {
    while (count) {
      const { index, result } = await Promise.race(nextPromises);
      if (result.done) {
        nextPromises[index] = never;
        results[index] = result.value;
        count--;
      } else {
        nextPromises[index] = getNext(asyncIterators[index], index);
        yield result.value;
      }
    }
  } finally {
    for (const [index, iterator] of asyncIterators.entries()) {
      if (nextPromises[index] != never && iterator.return != null) {
        // no await here - see https://github.com/tc39/proposal-async-iteration/issues/126
        void iterator.return();
      }
    }
  }
  return results;
}; 
Ben Elgar
  • 715
  • 10
  • 23
0

I solved this using async generators. (I wish I'd find this question a few days ago, would save me some time) Will gladly hear opinion and criticism.

async function* mergen(...gens) {
  const promises = gens.map((gen, index) =>
    gen.next().then(p => ({...p, gen}))
  );

  while (promises.length > 0) {
    yield race(promises).then(({index, value: {value, done, gen}}) => {
      promises.splice(index, 1);
      if (!done)
        promises.push(
          gen.next().then(({value: newVal, done: newDone}) => ({
            value: newVal,
            done: newDone,
            gen
          }))
        );
      return value;
    });
  }
};

// Needed to implement race to provide index of resolved promise
function race(promises) {
  return new Promise(resolve =>
    promises.forEach((p, index) => {
      p.then(value => {
        resolve({index, value});
      });
    })
  );
}

It took me a bunch of time to find and I got so excited I put it in a npm package :) https://www.npmjs.com/package/mergen

Ben
  • 10,020
  • 21
  • 94
  • 157
  • Your "Usage" on your npm package does not seem to match what the actual usage is. e.g. `const {mergen} = require('mergen.js')` -> `const mergen = require('mergen')` – Patrick Roberts Jun 03 '18 at 04:55
  • `{...p, gen}` What is the result has a value called `gen`? – sdgfsdh Jun 03 '18 at 08:52
  • It's an async iterable - the result must be of the structure: `{ value, done }` – Ben Jun 03 '18 at 11:48
  • Writing your own `race` makes no sense here. You already know the pattern to provide the `gen` in the fulfillment result, you could trivially have added the `index` there as well. – Bergi Jun 03 '18 at 13:28
  • @Bergi That's what I thought starting out and it doesn't work out once you start taking out elements. The indices stored with the promises lose all meaning at that point. – Ben Jun 03 '18 at 15:43
  • @Ben Yes, you will have to do it on every iteration (as your current `race` implementation does anyway) not before the `while` loop – Bergi Jun 03 '18 at 15:51
  • 1
    Suppose you have N generators that generate M promises in aggregate; this algorithm is O(N*M), when ideally it would be O(M). If the 0th promise in the array wins the race every time, you'll call `promises.splice(0, 1)` (which is O(N)) and push a new promise onto the end every time a promise completes. Instead, use a Map, like [this answer](https://stackoverflow.com/a/50587599/54829), which would make deleting promises O(1). – Dan Fabulich Sep 17 '21 at 05:44
0

Solution: IxJS

We can use The Interactive Extensions for JavaScript (IxJS) (docs) to easily achieve that:

import { merge } from 'ix/asynciterable'

const d = merge(a, b, c)

for await (const i of d) {
  console.info('merged:', i)
}

Will get the result:

$ ./src/t.ts 
merged a
merged x
merged b
merged y
merged c
merged i
merged j
merged k
merged z
Error: You have gone too far! 
    at Object.[Symbol.asyncIterator]

Full Code Example

const sleep = ms => new Promise((resolve) => {
  setTimeout(() => resolve(ms), ms);
});

const a = {
  [Symbol.asyncIterator]: async function * () {
    yield 'a';
    await sleep(1000);
    yield 'b';
    await sleep(2000);
    yield 'c';
  },
};

const b = {
  [Symbol.asyncIterator]: async function * () {
    await sleep(6000);
    yield 'i';
    yield 'j';
    await sleep(2000);
    yield 'k';
  },
};

const c = {
  [Symbol.asyncIterator]: async function * () {
    yield 'x';
    await sleep(2000);
    yield 'y';
    await sleep(8000);
    yield 'z';
    await sleep(10000);
    throw new Error('You have gone too far! ');
  },
};

const d = IxAsynciterable.merge(a, b, c)

async function main () {
  for await (const i of d) {
    console.info('merged', i)
  }
}

main().catch(console.error)
<script src="https://unpkg.com/ix@4.5.2/Ix.dom.asynciterable.es2015.min.js"></script>
Huan
  • 2,876
  • 3
  • 30
  • 49
-3

I hope I understood your question correctly, here's how I'd approach it:

let results = [];

Promise.all([ a, b, c ].map(async function(source) {
    for await (let item of source) {
        results.push(item);
    }
}))
.then(() => console.log(results));

I tried it with three normal arrays:

var a = [ 1, 2, 3 ];
var b = [ 4, 5, 6 ];
var c = [ 7, 8, 9 ];

And it resulted in [1, 4, 7, 2, 5, 8, 3, 6, 9].

Máté Safranka
  • 4,081
  • 1
  • 10
  • 22
  • This is a good attempt, and you have the right idea. However, it does not work if `a`, `b`, or `c` never terminate, which might be the case. I will update the question to make this clear. – sdgfsdh May 29 '18 at 14:02
  • Hmm. So, is it sort of like a `socket_select()` type situation? You have a bunch of potentially infinite sources, and you always want to get the next available value? – Máté Safranka May 29 '18 at 14:14
  • Yes, an iterable, unlike an array, might never end. This is valid: `async function * () { while (true) { yield 0; } }` – sdgfsdh May 29 '18 at 14:16