2

I would like to run a function with lots of different variables

Let assume my function is :

async fn do_the_hard_job(my_input: u16) {
    ...
    // do the hard job that takes some time
    if my_condition {
        println!("{}", input);
    }
}

And I would like to run this function for many inputs; let's say my inputs are

inputs = stream::iter(1..=u16::MAX);

Since the function spends some cycles i would like to run all inputs concurrently as possible. So I could run this like

 inputs.for_each_concurrent(0, |input| do_the_hard_job(input))
       .await;

So far so good; i ran all the inputs with the function and get my output on stdout. But what if I want the output to be written to a certain output file ?

I can not open a file and append into it in do_the_hard_job function. That would mess things. I cannot add file as a parameter since the method will be run concurrently which one will borrow the mutable file.

I have tried returning the value instead of printing in the method and then collect returned vales ; like this :

let mut return_values:Vec<u16> = Vec::new();
inputs
    .for_each_concurrent(0, |input| async move {
        if let done = do_the_hard_job(port).await{
            if done > 0 {
                return_values.push(port);
            }
        }}).await;

Sadly that didn't work. What can i try to achieve my goal ?

Edit : I prepared a reproducer for the problem : https://github.com/kursatkobya/bite-sized-qualms/tree/main/concurrent-write

  • The bottleneck is nothing to do with the async. The method is doing a heavy task so lets say it takes around 2 second to complete a single run for a single input value. – Kürşat Kobya Oct 26 '21 at 14:34
  • well, i guess your `do_the_hard_job` function should return some result, otherwise you cannot retrieve the results themselves. I think there are missing parts here or your implementation is completely wrong. – Netwave Oct 26 '21 at 14:39
  • I have changed that function to return things as well as i have mentioned in the latter part of the question. Yet when it comes to push results into a vector it failed. Here i have added a reproducer code : https://github.com/kursatkobya/bite-sized-qualms/tree/main/concurrent-write – Kürşat Kobya Oct 26 '21 at 15:31
  • 1
    It is better if you use the [rust playground](https://play.rust-lang.org/) and not github – Netwave Oct 26 '21 at 16:11
  • 1
    You are facing with [this problem](https://stackoverflow.com/questions/62557219/error-on-future-generator-closure-captured-variable-cannot-escape-fnmut-closu) . Easiest solution would be wrapping the consumer with `Arc>`, please check example for [playground](https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=1a804a6ab648cb1a26473224841abe0f), for the file you can use [tokio::fs](https://docs.rs/tokio/1.12.0/tokio/io/trait.AsyncWrite.html#impl-AsyncWrite) to write async values to the file – Ömer Erden Oct 26 '21 at 17:12
  • Also this post might be useful since it tells why you shouldn't block the async context, please check : https://stackoverflow.com/questions/48735952/why-does-futureselect-choose-the-future-with-a-longer-sleep-period-first – Ömer Erden Oct 26 '21 at 17:16
  • @ÖmerErden the solution you have provided works. Thanks a lot for the information – Kürşat Kobya Oct 26 '21 at 20:02
  • @KürşatKobya maybe you can write up the solution that worked for you as an answer? – Cornelius Roemer May 12 '22 at 00:44

2 Answers2

1

You can combine then and collect to get the results:

use futures::{stream, StreamExt};
use std::{
    time::Duration,
};

#[tokio::main]
async fn main() {
    let inputs = stream::iter(0..=10);

    // Finaly the one below does not work
    let output_values: Vec<u16> = inputs
        .then(|input: u16| async move {
            let result = f(input).await;
            result
        }).collect::<Vec<u16>>().await;
    println!("{:?}", output_values);

}

async fn f(input: u16) -> u16 {
    tokio::time::sleep(Duration::from_millis(200)).await;
    input
}

Playground

Netwave
  • 40,134
  • 6
  • 50
  • 93
  • 1
    Thanks for the answer. While this solution fixes the compilation and build problems, it destroyed the concurrency. Maybe this solution can be combined with another concurrency/multi-threaded solution. The other solution given in comment by @OmerErden worked. I can study a bit more about both solutions, thanks for providing information and giving me ideas to build a base to fight with the problem. – Kürşat Kobya Oct 26 '21 at 20:07
0

you can use zip() to zipping two streams together

use std::sync::Arc;
use async_stream::stream;
use futures::{Future, Stream, StreamExt};
use tokio::{sync::Mutex, time::sleep};
fn foo() -> impl Stream<Item = impl Future<Output = i32>> {
    stream! {
        for i in 0..10 {
            yield async move {
                sleep(std::time::Duration::from_millis(1000)).await;
                i
            }
        }
    }
}
#[tokio::main]
async fn main() {
    let s = Box::pin(foo());
    let v = Arc::new(Mutex::new(Vec::<i32>::new()));
    let z = {
        let _v = v.clone();
        s.zip(stream! {
            loop {
                yield _v.clone();
            }
        })
    };

    z.for_each_concurrent(2, |(fut, v)| async move {
        let num = fut.await;
        println!("{}", num);
        v.lock().await.push(num);
    })
    .await;
    println!("{:?}", v);
}
rw YAN
  • 1