7

I have a large vector of Hyper HTTP request futures and want to resolve them into a vector of results. Since there is a limit of maximum open files, I want to limit concurrency to N futures.

I've experimented with Stream::buffer_unordered but seems like it executed futures one by one.

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Pietro
  • 79
  • 2
  • 2
    Could you post the code you have already? – Peter Hall Apr 06 '17 at 09:08
  • Please [edit] your question to explain why you say "seems like it executed futures one by one". I have used `buffer_unordered` for **this exact purpose** and it worked for me. – Shepmaster Apr 06 '17 at 12:51

1 Answers1

5

We've used code like this in a project to avoid opening too many TCP sockets. These futures have Hyper futures within, so it seems exactly the same case.

// Convert the iterator into a `Stream`. We will process
// `PARALLELISM` futures at the same time, but with no specified
// order.
let all_done =
    futures::stream::iter(iterator_of_futures.map(Ok))
    .buffer_unordered(PARALLELISM);

// Everything after here is just using the stream in
// some manner, not directly related

let mut successes = Vec::with_capacity(LIMIT);
let mut failures = Vec::with_capacity(LIMIT);

// Pull values off the stream, dividing them into success and
// failure buckets.
let mut all_done = all_done.into_future();
loop {
    match core.run(all_done) {
        Ok((None, _)) => break,
        Ok((Some(v), next_all_done)) => {
            successes.push(v);
            all_done = next_all_done.into_future();
        }
        Err((v, next_all_done)) => {
            failures.push(v);
            all_done = next_all_done.into_future();
        }
    }
}

This is used in a piece of example code, so the event loop (core) is explicitly driven. Watching the number of file handles used by the program showed that it was capped. Additionally, before this bottleneck was added, we quickly ran out of allowable file handles, whereas afterward we did not.

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
  • Accumulation of successes / failures could be done simpler (and in line with the functional/iterator style) with [StreamExt::fold](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.fold) – nirvana-msu Oct 28 '22 at 22:53
  • @nirvana-msu probably. Note that this answer is from 2017 and predates stable `async` / `await` syntax and was [using futures 0.1](https://github.com/panoptix-za/influxdb-rs/blob/5f39c45fa4374631c2483cfd2fd3477bfc8b7ed6/Cargo.toml#L11), not 0.3. Quite a lot has changed since then. – Shepmaster Nov 01 '22 at 18:21