2

Using the futures crate.

I have a vec of futures which return a bool and I want to wait specifically for the future that returns true.

consider the following pool of futures.

async fn async_function(guess: u8) -> bool {
    let random_wait = rand::thread_rng().gen_range(0..2);
    std::thread::sleep(std::time::Duration::from_secs(random_wait));
    println!("Running guess {guess}");
    guess == 231
}

fn main() {
    let mut pool: Vec<impl Future<Output = bool>> = vec![];
    for guess in 0..=255 {
        pool.push(async_function(guess));
    }
}
  • How do I wait for the futures in the vec?
  • Is it possible to wait until only one future returns true?
  • Can I identify the value of guess for the future that returns true?

I'm new to async rust, so I've been looking at the async-book.

From there, these are the options I've considered:

  • join! waits until all threads are done, so that doesn't work for me since I want to drop the remaining futures.

  • select! doesn't seem to be an option, because I need to specify the specific future in the select block and I'm not about to make 255 line select.

  • try_join! is tempting me to break semantics and have my async_function return Err(guess)so that it causes the try_join to exit and return the value I want.

  • I tried using async_fn(guess).boxed.into_stream() and then using select_all from futures::stream but it doesn't seem to run concurrently. I see my async functions running in order.

Daniel Garcia
  • 462
  • 3
  • 8

1 Answers1

0

Ok, my thinking of futures was wrong. I knew that they weren't executed immediately, but I wasn't using the executors correctly from the futures crate.

Here's what I've got that seems to work.

let thread_pool = ThreadPool::new().unwrap();
let mut pool = vec![];
for guess in 0..=255 {
    let thread = thread_pool.spawn_with_handle(async_fn(guess)).expect("Failed to spawn thread");
    pool.push(thread.into_stream());
}

let stream = block_on_stream(futures::stream::select_all(pool));
for value in stream {
    println!("Got value {value}");
}

the thread pool executor is what creates the separate threads needed to run. Without this my application was single threaded so no matter what I tried, it would only run functions one at a time, not concurrently.

This way I spawn each request into the thread pool. The thread pool appears to spawn 4 background threads. By pushing them all into a stream, using select_all, and iterating over the stream, my main thread blocks until a new value is available.

There's always 4 workers and the thread pool is scheduling them in the order they were requested like a queue. This is perfect.

Daniel Garcia
  • 462
  • 3
  • 8
  • 1
    If you use a runtime, (like async-std or tokio), they will provide async timers rather than using a blocking sleep, and you would see the concurrency futures are designed for. Futures are not designed for CPU driven work or really even blocking io. For CPU driven work look as rayon, whilst for io try to use non-blocking io where possible. – user1937198 Feb 10 '23 at 01:19
  • Thanks, rayon looks more like what I was looking for. But you're right, since I'm using a thread pool here I don't think I really even needed futures. The runtimes are for managing the async/await but my use case doesn't need a lot of parallelization. I just had one function I needed to run in parallel over a range a inputs and this did the trick. – Daniel Garcia Feb 10 '23 at 18:10