0

I need to download a lot of files, but I want to process only 3 files at the same time. I'm receiving the next file URL from a Tokio MPSC channel. I need to cover the case when I'm already downloading 2 files and waiting for a new one.

How can I wait for one of 3 futures and extend a previous select_all?

A very simplified example: playground

use futures::{
    future::{select, select_all, Either},
    FutureExt,
};
use tokio::time::{delay_for, Duration};

async fn exec(val: usize) {
    println!("Started {}", val);
    delay_for(Duration::from_millis(100)).await;
    println!("Executed {}", val);
}

async fn get_next() -> usize {
    use rand::Rng;
    delay_for(Duration::from_millis(100)).await;
    rand::thread_rng().gen_range(0, 10)
}

#[tokio::main]
async fn main() {
    let mut tasks = Vec::new();
    for _ in 0..10 {
        if tasks.len() == 0 {
            let next = get_next().await;
            tasks.push(tokio::spawn(exec(next)));
            continue;
        }

        let all = select_all(tasks.drain(..));
        let next = get_next().boxed();

        match select(all, next).await {
            Either::Left(((_, _, left), _)) => tasks = left,
            Either::Right((a, _other)) => {
                // How can I get tasks futures from 'other'?
                // So on the next iteration, I'll be able to do select with an additional task.
                tasks.push(tokio::spawn(exec(a)));
            }
        };
    }
}

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Alex
  • 49
  • 2
  • 6
  • 1
    It looks like your question might be answered by the answers of [How can I perform parallel asynchronous HTTP GET requests with reqwest?](https://stackoverflow.com/q/51044467/155423). If not, please **[edit]** your question to explain the differences. Otherwise, we can mark this question as already answered. – Shepmaster Aug 19 '20 at 19:08
  • 1
    Specifically, [`StreamExt::buffer_unordered`](https://docs.rs/futures/0.3.5/futures/stream/trait.StreamExt.html#method.buffer_unordered). – Shepmaster Aug 19 '20 at 19:09
  • @Shepmaster , Oh... I thought that receiver doesn't imply StreamExt, but I just noticed that tokio needs the explicit feature for streams. Thank you! – Alex Aug 19 '20 at 21:37
  • Unfortunately, this function also doesn't exist in [tokio docs](https://docs.rs/tokio/0.2.22/tokio/stream/trait.StreamExt.html) – Alex Aug 19 '20 at 21:43

0 Answers0