56

The async example is useful, but being new to Rust and Tokio, I am struggling to work out how to do N requests at once, using URLs from a vector, and creating an iterator of the response HTML for each URL as a string.

How could this be done?

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
user964375
  • 2,201
  • 3
  • 26
  • 27

2 Answers2

155

Concurrent requests

As of reqwest 0.11.14:

use futures::{stream, StreamExt}; // 0.3.27
use reqwest::Client; // 0.11.14
use tokio; // 1.26.0, features = ["macros"]

const CONCURRENT_REQUESTS: usize = 2;

#[tokio::main]
async fn main() {
    let client = Client::new();

    let urls = vec!["https://api.ipify.org"; 2];

    let bodies = stream::iter(urls)
        .map(|url| {
            let client = &client;
            async move {
                let resp = client.get(url).send().await?;
                resp.bytes().await
            }
        })
        .buffer_unordered(CONCURRENT_REQUESTS);

    bodies
        .for_each(|b| async {
            match b {
                Ok(b) => println!("Got {} bytes", b.len()),
                Err(e) => eprintln!("Got an error: {}", e),
            }
        })
        .await;
}

stream::iter(urls)

stream::iter

Take a collection of strings and convert it into a Stream.

.map(|url| {

StreamExt::map

Run an asynchronous function on every element in the stream and transform the element to a new type.

let client = &client;
async move {

Take an explicit reference to the Client and move the reference (not the original Client) into an anonymous asynchronous block.

let resp = client.get(url).send().await?;

Start an asynchronous GET request using the Client's connection pool and wait for the request.

resp.bytes().await

Request and wait for the bytes of the response.

.buffer_unordered(N);

StreamExt::buffer_unordered

Convert a stream of futures into a stream of those future's values, executing the futures concurrently.

bodies
    .for_each(|b| {
        async {
            match b {
                Ok(b) => println!("Got {} bytes", b.len()),
                Err(e) => eprintln!("Got an error: {}", e),
            }
        }
    })
    .await;

StreamExt::for_each

Convert the stream back into a single future, printing out the amount of data received along the way, then wait for the future to complete.

See also:

Without bounded execution

If you wanted to, you could also convert an iterator into an iterator of futures and use future::join_all:

use futures::future; // 0.3.4
use reqwest::Client; // 0.10.1
use tokio; // 0.2.11

#[tokio::main]
async fn main() {
    let client = Client::new();

    let urls = vec!["https://api.ipify.org"; 2];

    let bodies = future::join_all(urls.into_iter().map(|url| {
        let client = &client;
        async move {
            let resp = client.get(url).send().await?;
            resp.bytes().await
        }
    }))
    .await;

    for b in bodies {
        match b {
            Ok(b) => println!("Got {} bytes", b.len()),
            Err(e) => eprintln!("Got an error: {}", e),
        }
    }
}

I'd encourage using the first example as you usually want to limit the concurrency, which buffer and buffer_unordered help with.

Parallel requests

Concurrent requests are generally good enough, but there are times where you need parallel requests. In that case, you need to spawn a task.

use futures::{stream, StreamExt}; // 0.3.8
use reqwest::Client; // 0.10.9
use tokio; // 0.2.24, features = ["macros"]

const PARALLEL_REQUESTS: usize = 2;

#[tokio::main]
async fn main() {
    let urls = vec!["https://api.ipify.org"; 2];

    let client = Client::new();

    let bodies = stream::iter(urls)
        .map(|url| {
            let client = client.clone();
            tokio::spawn(async move {
                let resp = client.get(url).send().await?;
                resp.bytes().await
            })
        })
        .buffer_unordered(PARALLEL_REQUESTS);

    bodies
        .for_each(|b| async {
            match b {
                Ok(Ok(b)) => println!("Got {} bytes", b.len()),
                Ok(Err(e)) => eprintln!("Got a reqwest::Error: {}", e),
                Err(e) => eprintln!("Got a tokio::JoinError: {}", e),
            }
        })
        .await;
}

The primary differences are:

  • We use tokio::spawn to perform work in separate tasks.
  • We have to give each task its own reqwest::Client. As recommended, we clone a shared client to make use of the connection pool.
  • There's an additional error case when the task cannot be joined.

See also:

BinaryButterfly
  • 18,137
  • 13
  • 50
  • 91
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
  • Why did you use ` stream::iter_ok(urls).map(..)` ? The closure passed to `.map()` returns a future? We should be using `and_then`? – Nawaz Dec 18 '19 at 18:07
  • Or alternatively, when to use `.map()` and when to use `.and_then()` .. both for stream? and for future? – Nawaz Dec 18 '19 at 18:08
  • @Nawaz See the documentation for [`Stream::buffer_unordered`](https://docs.rs/futures/0.1.21/futures/stream/trait.Stream.html#method.buffer_unordered): *If this stream's item can be converted into a future*. See also [What is the difference between `then`, `and_then` and `or_else` in Rust futures?](https://stackoverflow.com/q/55552413/155423) – Shepmaster Dec 18 '19 at 18:12
  • Thanks. I read that. I understand that. The `.map()` part is missing though.. Also, how does that translate to `stream` (not `future`). In case of `stream`, `and_then` is invoked for each _successful_ item in the stream. – Nawaz Dec 18 '19 at 18:15
  • _Convert a stream of futures into a stream of those future's values, executing the futures in parallel._..What if I want `stream of stream of futures`? I've `HaspMap>` and I want to fetch each `Url`.. and eventually want `HashMap>` – Nawaz Dec 18 '19 at 18:18
  • I'm trying something like this: `iter_ok(url_map).map(|k, urls| (k, iter_ok(urls).map(..))).buffered_unordered(N).... ` – Nawaz Dec 18 '19 at 18:22
  • @Nawaz please produce a [MRE] then post a new question, linking to this one and describing why it's different. Try to reproduce your error on the [Rust Playground](https://play.rust-lang.org) if possible, otherwise in a brand new Cargo project, then include all of that information to reproduce it in your question, such as versions. There are [Rust-specific MRE tips](//stackoverflow.com/tags/rust/info) you can use to reduce your original code for posting here. – Shepmaster Dec 18 '19 at 19:03
  • Is there a reason a stream is used rather than joining a list of futures? – jpsalm Feb 14 '20 at 20:57
  • @jpsalm `StreamExt` has the `buffer` and `buffer_unordered` methods, which you'll almost always want to use to prevent completely unbounded requests from taking up all of your system resources (e.g. open sockets). [You don't need it though](https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=2a3dcc64380f21e51b063f1c5ed82c2b) – Shepmaster Feb 14 '20 at 21:22
  • 1
    I think this example won't run parallel but only concurrent, based on [this gist](https://gist.github.com/lu4nm3/b8bca9431cdcf19d73040ada13387e58) which I found in [this post](http://patshaughnessy.net/2020/1/20/downloading-100000-files-using-async-rust). – Alex Moore-Niemi Aug 27 '20 at 02:58
  • 1
    @AlexMoore-Niemi fair enough. Updated. – Shepmaster Aug 31 '20 at 14:31
  • Question - can we clone a single client instead of giving each task its own client? – Mark Lodato Dec 09 '20 at 15:20
  • @MarkLodato I not sure I follow. When you clone a client you make another client, which you then give to each task as it's own. The only difference is how the unique client is created. So I think the answer is "yes". – Shepmaster Dec 09 '20 at 15:47
  • @Shepmaster I'm probably assuming too much but it looks like Client has an inner Arc. Would that imply that cloning it would clone the Arc and allow you to reuse the connection pool (or some wizardry) from the underlying ClientRef? – Mark Lodato Dec 10 '20 at 16:58
  • @MarkLodato yes, that’s what [the documentation recommends](https://docs.rs/reqwest/0.10.9/reqwest/struct.Client.html): *The Client holds a connection pool internally, so it is advised that you create one and reuse it. You do not have to wrap the Client it in an Rc or Arc to reuse it, because it already uses an Arc internally.* – Shepmaster Dec 10 '20 at 17:21
  • @Shepmaster Makes sense! Is there any value in updating this answer to using `clone` instead of `new`ing up for improved copy-ability? – Mark Lodato Dec 10 '20 at 18:05
  • @MarkLodato ah, now I see why you were asking! Yes, I've addressed that. – Shepmaster Dec 10 '20 at 18:23
  • @Shepmaster which of these approaches uses the M:N threading I keep reading so much about? i.e. where the tokio runtime will create concurrent threads but also create parallel threads where necessary – Dragoon Feb 20 '21 at 02:42
  • @Dragoon the one under *Parallel requests*. `tokio::spawn` creates new tasks, and tasks can be moved across threads (when Tokio is using a [multithreaded scheduler](https://docs.rs/tokio/1.2.0/tokio/runtime/index.html#multi-thread-scheduler)) – Shepmaster Feb 20 '21 at 03:21
-2

If possible for your problem I recommend using std async and rayon. They are both mature now and really easy to get started with given the async{/* code here */} scope bounds in std. You can also work into/alongside tokio with feature integration https://docs.rs/async-std/1.10.0/async_std/#features

Bots Fab
  • 139
  • 1
  • 9