4

Is there a technique to break an async loop, if it do not completes within an expected time period? I have a code like this:

(async()=>{
  for await(let t of asynDataStreamOrGenerator){
    //some data processing
  }
  //some other code I need to run based on whatever data collected by
  //asyncDataStreamOrGenerators within given time period
})()

If this loop is not completed within a timespan, break out of the loop and process the request further.

Heretic Monkey
  • 11,687
  • 7
  • 53
  • 122
Anurag Vohra
  • 1,781
  • 12
  • 28
  • Seems like that should be the responsibility of `asyncDataStreamOrGenerator` to me... – Heretic Monkey Oct 12 '21 at 13:10
  • what's wrong with simply check a flag at each iteration? (would not break current await, but is it really important?) – apple apple Oct 12 '21 at 13:12
  • 2
    `const startTime = Date.now(); for(....) { if (Date.now() - startTime > XXXXXX) break;` – epascarello Oct 12 '21 at 13:12
  • What is the asynchronous call? Fetch? XHR? – epascarello Oct 12 '21 at 13:13
  • @appleapple what if the `asyncDataStreamGenrator` never sends a value, neither does sends the close call. The `for await` will keep everything on stack memoery for ever. I am designing a consensus algorithm, where every source needs to send the response within a given time frame. If some of such participants are dead!, I mean they do not send values, the loop will be held for ever! – Anurag Vohra Oct 12 '21 at 13:15
  • @HereticMonkey The `asyncDataStreamOrGenerator` is not something I control. – Anurag Vohra Oct 12 '21 at 13:16
  • @AnuragVohra - That sounds like a timeout to me. You can implement that by doing a `Promise.race` on the promised iteration value and a timeout promise. – T.J. Crowder Oct 12 '21 at 13:16
  • @epascarello The code you shared is a synchronous code, however the problme at hand recieves value asynchronously. In my scenarios, even the first value can come after a long time, but my code don't want to wait that long. And simply break loop if it do not finishes naturally within a given timespan. Your code checks the timespan only if value are coming. – Anurag Vohra Oct 12 '21 at 13:20
  • 1
    Wrap it in something you do control, that you can cancel. `for await ... of` is really for async generators, so you should have opportunity to not generate any more if the timeout lapses (using the technique proposed by @T.J.Crowder). – Heretic Monkey Oct 12 '21 at 13:23
  • @HereticMonkey - That's a very good idea. I suggested something similar below, but nothing quite so elegant. Still getting used to iterators, all these years later. – T.J. Crowder Oct 12 '21 at 13:25
  • 1
    What is the asynchronous code? How is the call being made? The better answer depends on the exact details which I asked for after my first comment. – epascarello Oct 12 '21 at 13:28

3 Answers3

3

(See also the community wiki answer I posted with an alternative approach.)

In a comment you've said:

I am designing a consensus algorithm, where every source needs to send the response within a given time frame. If some of such participants are dead!, I mean they do not send values, the loop will be held for ever!

That sounds like a timeout to me. The usual way to implement a timeout is via Promise.race with a promise wrapped around a timer mechanism (setTimeout or similar). Promise.race watches the promises you pass into it and settles as soon as any of them settles (passing on that fulfillment or rejection), disregarding how any of the others settle later.

To do that, you'll need to loop another way instead of for-await-of and use the promise of the result object directly rather than indirectly. Let's say you have a utility function:

const delay = (ms, value) => new Promise(resolve => {
    setTimeout(resolve, ms, value);
});

That returns a promise it fulfills X milliseconds later with whatever value you provide (if any).

Then:

(async () => {
    const TIMEOUT = 500; // Milliseconds
    const GOT_TIMEOUT = {};
    const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
    try {
        while (true) {
            const p = it.next();
            const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
            if (result === GOT_TIMEOUT) {
                // Didn't get a response in time
                console.log("Timeout");
            } else {
                // Got a response
                if (result.done) {
                    // Iteration complete
                    console.log("Iteration complete");
                    break;
                }
                // ...some data processing on `result.value`...
                console.log(`Process ${result.value}`);
            }
        }
    } finally {
        try {
            it.return?.(); // Close the iterator if it needs closing
        } catch { }
    }
})();

Live Example using random durations for the async iterator's work, but forcing a timeout on the third iteration:

const delay = (ms, value) => new Promise(resolve => {
    setTimeout(resolve, ms, value);
});

async function* example() {
    for (let i = 1; i <= 6; ++i) {
        const ms = i === 3 ? 600 : Math.floor(Math.random() * 100);
        await delay(ms);
        yield i;
    }
}

(async () => {
    const asynDataStreamOrGenerator = example();
    const TIMEOUT = 500; // Milliseconds
    const GOT_TIMEOUT = {};
    const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
    try {
        while (true) {
            const p = it.next();
            const start = Date.now();
            const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
            const elapsed = Date.now() - start;
            if (result === GOT_TIMEOUT) {
                // Didn't get a response in time
                console.log(`Got timeout in ${elapsed}ms`);
            } else {
                // Got a response
                if (result.done) {
                    // Iteration complete
                    console.log(`Got iteration complete result in ${elapsed}ms`);
                    break;
                }
                // ...some data processing on `result.value`...
                console.log(`Got result ${result.value} to process in ${elapsed}ms`);
            }
        }
    } finally {
        try {
            it.return?.(); // Close the iterator if it needs closing
        } catch { }
    }
})();
.as-console-wrapper {
    max-height: 100% !important;
}

Here's that example with the timeout on the first iteration, since you seemed concerned about that case:

const delay = (ms, value) => new Promise(resolve => {
    setTimeout(resolve, ms, value);
});

async function* example() {
    for (let i = 1; i <= 6; ++i) {
        const ms = i === 1 ? 600 : Math.floor(Math.random() * 100);
        await delay(ms);
        yield i;
    }
}

(async () => {
    const asynDataStreamOrGenerator = example();
    const TIMEOUT = 500; // Milliseconds
    const GOT_TIMEOUT = {};
    const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
    try {
        while (true) {
            const p = it.next();
            const start = Date.now();
            const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
            const elapsed = Date.now() - start;
            if (result === GOT_TIMEOUT) {
                // Didn't get a response in time
                console.log(`Got timeout in ${elapsed}ms`);
            } else {
                // Got a response
                if (result.done) {
                    // Iteration complete
                    console.log(`Got iteration complete result in ${elapsed}ms`);
                    break;
                }
                // ...some data processing on `result.value`...
                console.log(`Got result ${result.value} to process in ${elapsed}ms`);
            }
        }
    } finally {
        try {
            it.return?.(); // Close the iterator if it needs closing
        } catch { }
    }
})();
.as-console-wrapper {
    max-height: 100% !important;
}

If you don't want the processing to hold up collection of the next value, you could not await the processing that you do (perhaps build up an array of the promises for completion of that processing and Promise.all them at the end).

Or if you want to bail out of the entire operation:

(async () => {
    const TIMEOUT = 500; // Milliseconds
    const GOT_TIMEOUT = {};
    const results = [];
    const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
    try {
        while (true) {
            const p = it.next();
            const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
            if (result === GOT_TIMEOUT) {
                // Didn't get a response in time, bail
                console.log("Timeout");
                break;
            }
            // Got a response
            if (result.done) {
                // Iteration complete
                console.log("Iteration complete");
                break;
            }
            console.log(`Got ${result.value}`);
            results.push(result.value);
        }
    } finally {
        try {
            it.return?.();
        } catch { }
    }
    // ...code here to process the contents of `results`...
    for (const value of results) {
        console.log(`Process ${value}`);
    }
})();

Live Example:

const delay = (ms, value) => new Promise(resolve => {
    setTimeout(resolve, ms, value);
});

async function* example() {
    for (let i = 1; i <= 6; ++i) {
        const ms = i === 3 ? 600 : Math.floor(Math.random() * 100);
        await delay(ms);
        yield i;
    }
}

(async () => {
    const asynDataStreamOrGenerator = example(); // For the example
    const TIMEOUT = 500; // Milliseconds
    const GOT_TIMEOUT = {};
    const results = [];
    const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
    try {
        while (true) {
            const p = it.next();
            const start = Date.now();
            const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
            const elapsed = Date.now() - start;
            if (result === GOT_TIMEOUT) {
                // Didn't get a response in time, bail
                console.log(`Got timeout after ${elapsed}ms`);
                break;
            }
            // Got a response
            if (result.done) {
                // Iteration complete
                console.log(`Got iteration complete after ${elapsed}ms`);
                break;
            }
            console.log(`Got value ${result.value} after ${elapsed}ms`);
            results.push(result.value);
        }
    } finally {
        try {
            it.return?.();
        } catch { }
    }
    // ...code here to process the contents of `results`...
    for (const value of results) {
        console.log(`Process ${value}`);
    }
})();
.as-console-wrapper {
    max-height: 100% !important;
}

And again where it times out on the first pass but not every pass (since this bails on the first timeout, we don't see subsequent ones):

const delay = (ms, value) => new Promise(resolve => {
    setTimeout(resolve, ms, value);
});

async function* example() {
    for (let i = 1; i <= 6; ++i) {
        const ms = i === 1 ? 600 : Math.floor(Math.random() * 100);
        await delay(ms);
        yield i;
    }
}

(async () => {
    const asynDataStreamOrGenerator = example(); // For the example
    const TIMEOUT = 500; // Milliseconds
    const GOT_TIMEOUT = {};
    const results = [];
    const it = asynDataStreamOrGenerator[Symbol.asyncIterator]();
    try {
        while (true) {
            const p = it.next();
            const start = Date.now();
            const result = await Promise.race([p, delay(TIMEOUT, GOT_TIMEOUT)]);
            const elapsed = Date.now() - start;
            if (result === GOT_TIMEOUT) {
                // Didn't get a response in time, bail
                console.log(`Got timeout after ${elapsed}ms`);
                break;
            }
            // Got a response
            if (result.done) {
                // Iteration complete
                console.log(`Got iteration complete after ${elapsed}ms`);
                break;
            }
            console.log(`Got value ${result.value} after ${elapsed}ms`);
            results.push(result.value);
        }
    } finally {
        try {
            it.return?.();
        } catch { }
    }
    // ...code here to process the contents of `results`...
    for (const value of results) {
        console.log(`Process ${value}`);
    }
})();
.as-console-wrapper {
    max-height: 100% !important;
}

Or some combination of the two. You'll need to tweak this based on what you're really doing, but that's a direction that seems reasonable.


In all of the above:

  • Replace it.return?.(); with if (it.return) { it.return(); } if your environment doesn't support optional chaining yet.
  • Replace catch { } with catch (e) { } if your environment doesn't support optional catch bindings yet.
T.J. Crowder
  • 1,031,962
  • 187
  • 1,923
  • 1,875
  • 1
    I believe OP want to timeout on the whole operations (instead of individual) – apple apple Oct 12 '21 at 13:25
  • @appleapple - I couldn't tell for sure, but "delay" doesn't sound like "cancel" or bail-out to me. It's a small tweak if so though. I've added that tweak. – T.J. Crowder Oct 12 '21 at 13:27
  • @appleapple. Exactly. The code shared above assume that we will definitely receive value on time. The async generator can send value at any interval. I wanted to break the loop if a given time has passed, even if I have not recived a single value meanwhile. The timeout code n above solution will only trigger if we recive values, then it will calculate if time has passed. But the problem is "async loop should not take more than X time, even if we have not recived a single value". – Anurag Vohra Oct 12 '21 at 13:28
  • 1
    @AnuragVohra - *"The code shared above assume that we will definitely receive value on time."* No, it doesn't. That's the whole point of the `Promise.race` thing. (Notice no `await` on `for-of`; we get the promise from the async iterator, we don't wait for it to settle before calling `Promise.race`.) I've updated the answer to show the tweak if you want to break (not "delay") the loop. – T.J. Crowder Oct 12 '21 at 13:29
  • @T.J.Crowder will this `for (const p of asynDataStreamOrGenerator)` code iterate even once if we have not received even a single value yet?, I am no expert, just asking. My assumption is that it will create a promise only once it receives some data. – Anurag Vohra Oct 12 '21 at 13:34
  • @T.J.Crowder I will test your code and update you back. – Anurag Vohra Oct 12 '21 at 13:48
  • @AnuragVohra - It's the nature of async iterators, they return a *promise* of a result object, which is fulfilled later with the result object (with the `done` and/or `value` properties). I realized I'd kind of forgotten to use those properties so came back here to fix it. :-D See the updated code above. – T.J. Crowder Oct 12 '21 at 13:49
  • @AnuragVohra - Sorry, this is what I get for posting in a rush. You can't use `for-of` on an async iterable. i've fixed the answer and added live examples. I hope it helps. – T.J. Crowder Oct 12 '21 at 14:04
  • @T.J.Crowder the code you have shared is very close to what is required. However it is not the solution. The last code you posted will wait every `TIMEOUT` ms before breaking. However it is expected that a loop should end before a given time frame. The code above will take `n*TIMEOUT` times if every thing comes just before `TIMEOUT-1` times. – Anurag Vohra Oct 12 '21 at 14:22
  • @AnuragVohra - *"The last code you posted will wait every TIMEOUT ms before breaking."* It doesn't do that, there's no delay other than the delay of the async iterable itself if the timeout doesn't occur. Naturally, it waits for the timeout if it doesn't get something from the async iterable in time. One of the examples above keeps going after a timeout (and will not wait a full timeout on each iteration unless that iteration, also, times out), another of the examples above breaks out the loop on the first timeout. Neither introduces additional delay other than when a timeout happens. – T.J. Crowder Oct 12 '21 at 14:32
3

You can use a timeout Promise (timer in the code) and use Promise.race on each iteration.


The code below would print up to around 30 while the generator can generate more.

async function wait(ms) {
  return new Promise(r=>setTimeout(r, ms))
}

async function* asynDataStreamOrGenerator() {
  for (let i = 0; i < 100; ++i) {
    await wait(30)
    yield i;
  }
}

async function* iterate_until(generator, timeout) {
  let timer = wait(timeout).then(_=>Promise.reject("TimeOut"))
  for (;;) {
    let it = generator.next()
    let result = await Promise.race([timer, it])
    if (result.done) break;
    yield result.value;
  }
}

{(async () => {
  try {
    for await (let t of iterate_until(asynDataStreamOrGenerator(), 1000)) {
      console.log(t)
    }
  } catch (e) { /* catch timeout, rethrow if needed*/ }
})()}

JSFiddle Link

apple apple
  • 10,292
  • 2
  • 16
  • 36
1

In the comments on the question, Heretic Monkey suggested wrapping the async iterable in another one that implements a timeout. That's a very good idea, because then the code using that wrapper can use for-await-of.

If you want to keep going after a timeout, it looks something like this:

async function* timeoutWrapper(asyncIterable, timeoutDuration, timeoutValue) {
    const it = asyncIterable[Symbol.asyncIterator]();
    try {
        while (true) {
            const result = await Promise.race([
                it.next(),
                delay(timeoutDuration, timeoutValue)
            ]);
            if (result === timeoutValue) {
                yield timeoutValue;
            } else if (result.done) {
                break;
            } else {
                yield result.value;
            }
        }
    } finally {
        it.return?.();
    }
}

Using it:

for await (const t of timeoutWrapper(asynDataStreamOrGenerator, TIMEOUT, GOT_TIMEOUT)) {
    if (t === GOT_TIMEOUT) {
        // Didn't get a response in time
        console.log("Timeout");
    } else {
        // Got a response
        // ...some data processing on `result.value`...
        console.log(`Process ${t}`);
    }
}
console.log("Done");

Live Example:

const delay = (ms, value) => new Promise(resolve => {
    setTimeout(resolve, ms, value);
});

async function* example() {
    for (let i = 1; i <= 6; ++i) {
        await delay(200 + i * 100);
        yield i;
    }
}

async function* timeoutWrapper(asyncIterable, timeoutDuration, timeoutValue) {
    const it = asyncIterable[Symbol.asyncIterator]();
    try {
        while (true) {
            const result = await Promise.race([
                it.next(),
                delay(timeoutDuration, timeoutValue)
            ]);
            if (result === timeoutValue) {
                yield timeoutValue;
            } else if (result.done) {
                break;
            } else {
                yield result.value;
            }
        }
    } finally {
        it.return?.();
    }
}

(async () => {
    const asynDataStreamOrGenerator = example();
    const TIMEOUT = 500; // Milliseconds
    const GOT_TIMEOUT = {};
    for await (const t of timeoutWrapper(asynDataStreamOrGenerator, TIMEOUT, GOT_TIMEOUT)) {
        if (t === GOT_TIMEOUT) {
            // Didn't get a response in time
            console.log("Timeout");
        } else {
            // Got a response
            // ...some data processing on `result.value`...
            console.log(`Process ${t}`);
        }
    }
    console.log("Done");
})();

If you don't want to keep going after a timeout, you could have it throw an error (terminating the iteration) instead:

async function* timeoutWrapper(asyncIterable, timeoutDuration) {
    const TIMEOUT_VALUE = {};
    const it = asyncIterable[Symbol.asyncIterator]();
    try {
        while (true) {
            const result = await Promise.race([
                it.next(),
                delay(timeoutDuration, TIMEOUT_VALUE)
            ]);
            if (result === TIMEOUT_VALUE) {
                throw new Error(`Timeout after ${timeoutDuration}ms`);
            } else if (result.done) {
                break;
            } else {
                yield result.value;
            }
        }
    } finally {
        it.return?.();
    }
}

Using it:

try {
    for await (const t of timeoutWrapper(asynDataStreamOrGenerator, TIMEOUT)) {
        // Got a response
        console.log(`Process ${t}`);
    }
    console.log("Done");
} catch (e) {
    console.error(e.message);
}

Live Example:

const delay = (ms, value) => new Promise(resolve => {
    setTimeout(resolve, ms, value);
});

async function* example() {
    for (let i = 1; i <= 6; ++i) {
        await delay(200 + i * 100);
        yield i;
    }
}

async function* timeoutWrapper(asyncIterable, timeoutDuration) {
    const TIMEOUT_VALUE = {};
    const it = asyncIterable[Symbol.asyncIterator]();
    try {
        while (true) {
            const result = await Promise.race([
                it.next(),
                delay(timeoutDuration, TIMEOUT_VALUE)
            ]);
            if (result === TIMEOUT_VALUE) {
                throw new Error(`Timeout after ${timeoutDuration}ms`);
            } else if (result.done) {
                break;
            } else {
                yield result.value;
            }
        }
    } finally {
        it.return?.();
    }
}

(async () => {
    const asynDataStreamOrGenerator = example();
    const TIMEOUT = 500; // Milliseconds
    try {
        for await (const t of timeoutWrapper(asynDataStreamOrGenerator, TIMEOUT)) {
            // Got a response
            console.log(`Process ${t}`);
        }
        console.log("Done");
    } catch (e) {
        console.error(e.message);
    }
})();
T.J. Crowder
  • 1,031,962
  • 187
  • 1,923
  • 1,875
  • A `let start = Date.now();` before `const asynDataStreamOrGenerator = example();` and `console.log("Duration: ",Date.now()-start);` after catch block will shows that this async interaor works for more than 500ms (~1700ms on my system) – Anurag Vohra Oct 13 '21 at 03:56
  • @AnuragVohra - There are intentional delays above to show what's happening. There is no significant extra overhead in this. *"async interaor works for more than 500ms (~1700ms on my system)"* Are you looking at total time? Read the code closely. The timeout being applied above is per iteration (because that's what you seemed to want). If you want something else, just modify the code slightly to do that other thing instead. All the techniques and information you need is already there. – T.J. Crowder Oct 13 '21 at 07:13