0

I am writing an asynchronous program for downloading items from peer and I am trying to keep track of the number of active peers. I have the following implementation:

use futures::future::join_all;
use futures::future::join;
use std::time::Duration;

async fn download(_i: &str) -> std::io::Result<()> {
    // can return io error if connection failed etc.
    tokio::time::sleep(Duration::from_millis(300)).await;
    Ok(())
}

#[tokio::main]
async fn main() -> std::io::Result<()> {

    let peers = vec!["one", "two", "three"];
    
    let active_peers = std::sync::Arc::new(std::sync::Mutex::new(0));
    
    let workers = join_all(peers.into_iter().map(|peer| {
        let active_peers = active_peers.clone();
        async move {
            *active_peers.lock().unwrap() += 1;
            download(peer).await.unwrap();
            *active_peers.lock().unwrap() -= 1;
        }
    }));
    
    let result = async move {
        for _ in 0..5 {
           tokio::time::sleep(Duration::from_millis(100)).await;
           println!("active peers {:?}", active_peers.lock().unwrap());
        }
    };
    
    join(workers, result).await; // should start all download workers at the same time
    
    Ok(())
}

output:

active peers 3
active peers 3
active peers 0
active peers 0
active peers 0

playground link

However, I have structures like Vec<u8>, which I would prefer passing as a reference instead of doing like this: playground link. I believe running Vec<u8>::clone will make a copy of the heap allocation, unless I wrap the vector in an Rc or Arc ? Does this causes more indirection than mutually borrowing the vector between the futures?

Since I am using join instead of spawn, I am trying to take advantage of the non-'static requirement of using join instead of spawn, as mentioned here, i.e. not needing to own the data inside the future scope. This is similar to how std::thread::scope work, see playground link.

Is there a way to make this compile without the move keyword? I get the following compile error with just async { ... }:

error[E0373]: async block may outlive the current function, but it borrows `active_peers`, which is owned by the current function
  --> src/main.rs:20:15
   |
20 |           async {
   |  _______________^
21 | |             *active_peers.lock().unwrap() += 1;
   | |              ------------ `active_peers` is borrowed here
22 | |             download(peer).await.unwrap();
23 | |             *active_peers.lock().unwrap() -= 1;
24 | |         }
   | |_________^ may outlive borrowed value `active_peers`
   |

if I remove the constraint of keeping track of the number of active peers, this compiles without any errors:

#[tokio::main]
async fn main() -> std::io::Result<()> {

    let peers = vec!["one", "two", "three"];
    
    let v: Vec<u8> = vec![1, 2, 3, 4];
    
    let workers = join_all(peers.into_iter().map(|peer| {
        async {
            download(peer, &v).await.unwrap();
        }
    }));
    
    let result = async {
        for _ in 0..5 {
           tokio::time::sleep(Duration::from_millis(100)).await;
           println!("active peers {:?}", 0);
        }
    };
    
    join(workers, result).await; // should start all download workers at the same time
    
    Ok(())
}

playground link

Which is intuitively how I would like it to work.

Other suggestions on how to keep track of the number of non-failed futures inside a futures::futures::JoinAll is also welcome.

Kevin
  • 3,096
  • 2
  • 8
  • 37

1 Answers1

2

The move is only necessary for active_peers. You can just replace v with a reference instead of a clone and it'll work:

let active_peers = active_peers.clone();
let v = &v; // we only need to move a reference in
async move {
    *active_peers.lock().unwrap() += 1;
    download(peer, v).await.unwrap();
    *active_peers.lock().unwrap() -= 1;
}

Playground link

As a side note, taking a &[T] as a function argument is almost always better than taking &Vec<T>.

Aplet123
  • 33,825
  • 1
  • 29
  • 55