I am writing an asynchronous program for downloading items from peer and I am trying to keep track of the number of active peers. I have the following implementation:
use futures::future::join_all;
use futures::future::join;
use std::time::Duration;
async fn download(_i: &str) -> std::io::Result<()> {
// can return io error if connection failed etc.
tokio::time::sleep(Duration::from_millis(300)).await;
Ok(())
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
let peers = vec!["one", "two", "three"];
let active_peers = std::sync::Arc::new(std::sync::Mutex::new(0));
let workers = join_all(peers.into_iter().map(|peer| {
let active_peers = active_peers.clone();
async move {
*active_peers.lock().unwrap() += 1;
download(peer).await.unwrap();
*active_peers.lock().unwrap() -= 1;
}
}));
let result = async move {
for _ in 0..5 {
tokio::time::sleep(Duration::from_millis(100)).await;
println!("active peers {:?}", active_peers.lock().unwrap());
}
};
join(workers, result).await; // should start all download workers at the same time
Ok(())
}
output:
active peers 3
active peers 3
active peers 0
active peers 0
active peers 0
However, I have structures like Vec<u8>
, which I would prefer passing as a reference instead of doing like this: playground link. I believe running Vec<u8>::clone
will make a copy of the heap allocation, unless I wrap the vector in an Rc
or Arc
? Does this causes more indirection than mutually borrowing the vector between the futures?
Since I am using join
instead of spawn
, I am trying to take advantage of the non-'static
requirement of using join
instead of spawn
, as mentioned here, i.e. not needing to own the data inside the future scope. This is similar to how std::thread::scope
work, see playground link.
Is there a way to make this compile without the move
keyword? I get the following compile error with just async { ... }
:
error[E0373]: async block may outlive the current function, but it borrows `active_peers`, which is owned by the current function
--> src/main.rs:20:15
|
20 | async {
| _______________^
21 | | *active_peers.lock().unwrap() += 1;
| | ------------ `active_peers` is borrowed here
22 | | download(peer).await.unwrap();
23 | | *active_peers.lock().unwrap() -= 1;
24 | | }
| |_________^ may outlive borrowed value `active_peers`
|
if I remove the constraint of keeping track of the number of active peers, this compiles without any errors:
#[tokio::main]
async fn main() -> std::io::Result<()> {
let peers = vec!["one", "two", "three"];
let v: Vec<u8> = vec![1, 2, 3, 4];
let workers = join_all(peers.into_iter().map(|peer| {
async {
download(peer, &v).await.unwrap();
}
}));
let result = async {
for _ in 0..5 {
tokio::time::sleep(Duration::from_millis(100)).await;
println!("active peers {:?}", 0);
}
};
join(workers, result).await; // should start all download workers at the same time
Ok(())
}
Which is intuitively how I would like it to work.
Other suggestions on how to keep track of the number of non-failed futures inside a futures::futures::JoinAll
is also welcome.