-2

How to start this function simultaneously

I tried using thread in which I get borrowed data escapes outside of function

for item in 0..10{
    download_image(
        chapter_hash, // &str
        image, // &str
        name, // &str
        title, // &str
        vol, // &str
        chapter, // &str
        item, // usize
    ).await;
};

What I would like to do is run this function at same time.

EDIT

I added some comments, hope u can make head and tails from code.

It simply searches json for data and download file from these data

async fn download_chapter(
    json: String,
    name: &str,
    title: &str,
    vol: &str,
    chapter: &str,
    folder_name: &str,
) {
    match serde_json::from_str(&json) {
        Ok(json_value) => match json_value {
            Value::Object(obj) => {
                if let Some(data_array) = obj.get("chapter") {
                    // checks if chapter is in the json and store it in value
                    if let Some(chapter_hash) = data_array.get("hash").and_then(Value::as_str) {
                        // checks if hash is in chapter and store it in value
                        if let Some(images1) = data_array.get("data").and_then(Value::as_array) {
                            // same but with data
                            let images_length = images1.len(); // .and_then(Value::as_array) if just for length
                            if let Some(images) = data_array.get("data") {
                                // this json is [{...}, {...}]
                                let folder_path =
                                    format!("{} - {}Ch.{} - {}", name, vol, chapter, title);

                                // Code for creating folder

                                let pb = ProgressBar::new(images_length as u64);

                                pb.set_style(
                                        ProgressStyle::default_bar()
                                            .template("   [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({percent}%) {msg}")
                                            .progress_chars("#>-"),
                                    );

                                for item in 0..images_length {
                                    if let Some(image) = images.get(item) {
                                        // Formatting file path
                                        let file_name = format!(
                                            "{} - {}Ch.{} - {} - {}.jpg",
                                            name,
                                            vol,
                                            chapter,
                                            title,
                                            &item + 1
                                        );
                                        let file_path = format!(
                                            "{}",
                                            file_name
                                                .replace('<', "")
                                                .replace('>', "")
                                                .replace(':', "")
                                                .replace('|', "")
                                                .replace('?', "")
                                                .replace('*', "")
                                                .replace('/', "")
                                                .replace('\\', "")
                                                .replace('"', "")
                                                .replace('\'', "")
                                        );

                                        // The function that can run in threads
                                        download_image(
                                            chapter_hash,
                                            image,
                                            name,
                                            title,
                                            vol,
                                            chapter,
                                            item,
                                        )
                                        .await;
                                        pb.inc(1);
                                    } else {
                                        println!("Image not found")
                                    }
                                }
                                pb.finish_with_message("Done downloading");
                            } else {
                                println!("Missing data for chapter")
                            }
                        } else {
                            println!("Missing data for chapter")
                        }
                    } else {
                        println!("Chapter number missing")
                    }
                } else {
                    println!("  JSON does not contain a 'chapter' array.");
                }
            }
            _ => {
                println!("  JSON is not an object.");
            }
        },
        Err(err) => println!("  Error parsing JSON: {}", err),
    }
}

As you guys suggested I change it, and it works just as expected

It runs max 12 threads at the time, probably because I have 12 thread processor

let tasks = (0..images_length).map(|item| {
    if let Some(image_tmp) = images.get(item) {
        let image_temp = image_tmp.to_string();

        // code for creating directory

        let chapter_hash = chapter_hash.to_string();
        let image = image_temp.trim_matches('"').to_string();
        let name = name.to_string();
        let title = title.to_string();
        let vol = vol.to_string();
        let chapter = chapter.to_string();

        tokio::spawn(async move {
            download_image(
                &chapter_hash,
                &image,
                &name,
                &title,
                &vol,
                &chapter,
                item,
            )
            .await;
        })
    } else {
        tokio::spawn(async move {
            download_image("", "", "", "", "", "", 0).await;
        })
    }
});

let _: Vec<_> =
    futures::future::join_all(tasks).await.into_iter().collect();
Hovercraft
  • 17
  • 5
  • Why do you need a reference to a `usize`? – tadman Aug 30 '23 at 17:00
  • 1
    Ups, my bad in definition i had item: &usize – Hovercraft Aug 30 '23 at 17:05
  • Does this answer your question? [How can I pass a reference to a stack variable to a thread?](/q/32750829/2189130) – kmdreko Aug 30 '23 at 17:10
  • oh wait, you have a `.await` so that doesn't really help – kmdreko Aug 30 '23 at 17:14
  • 3
    probably just want [`futures::future::join_all`](https://docs.rs/futures/latest/futures/future/fn.join_all.html) – kmdreko Aug 30 '23 at 17:21
  • 1
    Use `map`, then `join_all` as kmdreko is suggesting. As for borrowing, you may need to wrap in `Arc` to make it shared. – tadman Aug 30 '23 at 17:24
  • 1
    Please post a [Minimal Reproducible Example](https://stackoverflow.com/help/minimal-reproducible-example) to help us help you, as well as the whole error from `cargo check` (not your IDE). – Chayim Friedman Aug 30 '23 at 18:23
  • 1
    I'm quite certain your question is an [XY problem](https://xyproblem.info/). You are asking for threads, and yet you are having an async function there. So what you are probably trying to ask is "How do I run multiple **tasks** asynchronously, whilst waiting for all of them to finish in the end?" Spawning threads manually is almost always wrong when dealing with async code. That's the job of the reactor, like `tokio`, and he does that internally without the user having to do anything. And yes, your answer is most likely `futures::future::join_all`, as @kmdreko already stated. – Finomnis Aug 31 '23 at 06:00
  • But how to do that exactly, or if any borrow checker problems still exist, is impossible to say from the minimal amount of code you provide. I agree with @ChayimFriedman, please provide a [mre]. – Finomnis Aug 31 '23 at 06:02
  • The important question here is: Are the `&str`s `&'static str`s or `&'a str`s? – Finomnis Aug 31 '23 at 06:03

1 Answers1

1

With async functions, don't use threads. They are the wrong tool; the async reactor (like tokio) already spawns threads for you.

You just have to tell the reactor that you want to execute multiple tasks asynchronously, like with futures::future::join_all.

Here is an example that is loosely based on your code:

use std::thread;
use std::time::Duration;

async fn do_one(name: &str, item: usize, i: usize) {
    println!(
        "{:?}: Do: {}: {}, {}",
        thread::current().id(),
        i,
        name,
        item
    );
    tokio::time::sleep(Duration::from_millis(100)).await;
    println!("{:?}: {} done.", thread::current().id(), i);
}

async fn do_many(name: &str, item: usize) {
    futures::future::join_all((0..10).map(move |i| do_one(name, item, i))).await;
}

#[tokio::main]
async fn main() {
    do_many("foo", 42).await;
}
ThreadId(1): Do: 0: foo, 42
ThreadId(1): Do: 1: foo, 42
ThreadId(1): Do: 2: foo, 42
ThreadId(1): Do: 3: foo, 42
ThreadId(1): Do: 4: foo, 42
ThreadId(1): Do: 5: foo, 42
ThreadId(1): Do: 6: foo, 42
ThreadId(1): Do: 7: foo, 42
ThreadId(1): Do: 8: foo, 42
ThreadId(1): Do: 9: foo, 42
ThreadId(1): 0 done.
ThreadId(1): 1 done.
ThreadId(1): 2 done.
ThreadId(1): 3 done.
ThreadId(1): 4 done.
ThreadId(1): 5 done.
ThreadId(1): 6 done.
ThreadId(1): 7 done.
ThreadId(1): 8 done.
ThreadId(1): 9 done.

Additional remark:

futures::future::join_all does not cause parallelism (as can be seen by the printed thread id). So far, it's single-threaded asynchronism. To allow tokio to schedule the tasks onto multiple threads, you need to tokio::spawn them first.

This, however, is not possible with arguments that have non-'static lifetimes attached to them, like your &strs (I assume that they are not &'static).

You could solve this by copying the &strs to owned Strings:

use std::thread;
use std::time::Duration;

async fn do_one(name: String, item: usize, i: usize) {
    println!(
        "{:?}: Do: {}: {}, {}",
        thread::current().id(),
        i,
        name,
        item
    );
    tokio::time::sleep(Duration::from_millis(100)).await;
    println!("{:?}: {} done.", thread::current().id(), i);
}

async fn do_many(name: &str, item: usize) {
    let futures = (0..10).map(|i| {
        let name = name.to_string();
        tokio::spawn(do_one(name, item, i))
    });

    futures::future::join_all(futures).await;
}

#[tokio::main]
async fn main() {
    do_many("foo", 42).await;
}
ThreadId(2): Do: 0: foo, 42
ThreadId(4): Do: 1: foo, 42
ThreadId(4): Do: 2: foo, 42
ThreadId(4): Do: 3: foo, 42
ThreadId(3): Do: 5: foo, 42
ThreadId(2): Do: 7: foo, 42
ThreadId(3): Do: 6: foo, 42
ThreadId(3): Do: 8: foo, 42
ThreadId(2): Do: 9: foo, 42
ThreadId(4): Do: 4: foo, 42
ThreadId(3): 4 done.
ThreadId(5): 5 done.
ThreadId(5): 0 done.
ThreadId(3): 6 done.
ThreadId(5): 1 done.
ThreadId(5): 2 done.
ThreadId(5): 8 done.
ThreadId(4): 9 done.
ThreadId(3): 7 done.
ThreadId(2): 3 done.

Note that the thread ids now indicate that this actually runs on multiple threads. Also note that some tasks run on the same thread - tokio::spawn does not cause the spawning of an OS thread. Rather, all tasks spawned by tokio::spawn get distributed onto the number of existing OS threads. (which most likely corresponds somehow to the number of cores the system has)

For more information, read tokios tutorial chapter about spawning.

Finomnis
  • 18,094
  • 1
  • 20
  • 27