0

I am very new to rust and trying to create a Parallel.Run method of C# in rust.

C# code

Parallel.ForEach(collection, x => {
  // do something with x 
})

Rust Code

pub async fn parallel_run<T>(collection: Vec<T>, callback:  fn(item: T) -> dyn Future<Output= ()>){
{
    
  for item in collection  
  {
       
    tokio::spawn(callback(item)).await.expect("TODO: panic message");
  }

}


Compile Error

the size for values of type dyn Future<Output = ()> cannot be known at compilation time [E0277] doesn't have a size known at compile-time Help: the trait Sized is not implemented for dyn Future<Output = ()> Note: required by a bound in tokio::spawn dyn Future<Output = ()> cannot be sent between threads safely [E0277] Help: the trait Send is not implemented for dyn Future<Output = ()> Note: required by a bound in tokio::spawn

What am I missing ??

I need to re-create Parallel.ForEach equivalent in rust

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Sunil Kumar
  • 91
  • 2
  • 4
  • 1
    I am not particularly familiar with C#, but based on the naming alone, it sounds like what you're looking for is `rayon` – Ivan C May 06 '23 at 07:39
  • Does this answer your question? [How can I perform parallel asynchronous HTTP GET requests with reqwest?](https://stackoverflow.com/questions/51044467/how-can-i-perform-parallel-asynchronous-http-get-requests-with-reqwest) – xamgore May 06 '23 at 07:44
  • You missed that you can't ever have a bare `dyn Future` on the stack (that includes function return values), you need some indirection (a reference, `Box`, `Rc`, `Arc`, ...) – cafce25 May 06 '23 at 08:18
  • Does this answer your question? [How can I fix the error E0277: the trait bound \`\[usize\]: std::marker::Sized\` is not satisfied?](https://stackoverflow.com/questions/51650259/how-can-i-fix-the-error-e0277-the-trait-bound-usize-stdmarkersized-is) – cafce25 May 06 '23 at 08:21

1 Answers1

1

If you need just some calculation (CPU-bound task), you can just use rayon

Example of doing parallel filtering and squaring values:

use rayon::iter::{IntoParallelRefIterator, ParallelIterator};

fn my_complex_calculation(v: i32)->i32 {
    v*v
}

fn main(){
    let values = vec![1,2,3,4,5,6,7,8,9,10];
    let updated: Vec<i32> = values.par_iter()
        .filter(|&&x| x & 1 == 0)
        .copied()
        .map(my_complex_calculation)
        .collect();
    assert_eq!(*updated, [4,16,36,64,100]);
}

If you need some async tasks in parallel and wait all of them, you can use JoinSet from tokio.

use tokio::task::JoinSet;

async fn my_network_request(v: i32)->i32 {
    // Write your code here.
    v*v
}


#[tokio::main]
async fn main(){
    let values = vec![1,2,3,4,5,6,7,8,9,10];
    let mut results = vec![0; values.len()];
    let mut join_set = JoinSet::new();
    for (i, &v) in values.iter().enumerate(){
        join_set.spawn(async move {
            (i, my_network_request(v).await)
        });
    }
    while let Some(res) = join_set.join_next().await {
        let(idx, val) = res.unwrap();
        results[idx] = val;
    }
    assert_eq!(*results, [1, 4, 9, 16,25, 36,49, 64,81,100]);
}

Also, @chayim-friedman suggested to use futures::future::join_all in comments. Note that unlike my previous suggestions, this would not parallelize computational cost between CPUs.

use futures::future::join_all;

async fn my_network_request(v: i32)->i32 {
    // Write your code here.
    v*v
}


#[tokio::main]
async fn main(){
    let values = vec![1,2,3,4,5,6,7,8,9,10];
    let results = join_all(
        values.iter().copied().map(my_network_request)
    ).await;
    assert_eq!(*results, [1, 4, 9, 16,25, 36,49, 64,81,100]);
}
  • Usually, there is no reason to spawn tokio tasks for things like sending multiple network requests and this is heavier than needed. Instead, use something like [`futures::future::join_all()`](https://docs.rs/futures/latest/futures/future/fn.join_all.html). – Chayim Friedman May 06 '23 at 17:42