2

I am experimenting with tokio runtime in rust and making a web crawler.

Right now, basically, I am spawning a separate task for each link to crawl since parallelism seems to be better than simple concurrency after a limit. All the communication is being done through channels, and there is no function return. So I am just wondering what might be a good way to add a limit to tasks spawned?

I was using the method given in this answer when I was using function returns, but it simply does not seem too useful with channels.

I also thought about simply taking a limited number of to_crawl links but that makes it difficult to keep track of depth.

I can't just move the channel receiving part to a separate task since I am currently using mutable HashSets for keeping track of links and that might get messy with mutex locking and unlocking.

Anyway, I would also love to hear other peoples thoughts over whether channels should even be used over function returns in a program like this coz I pretty much decided to do that on a whim after hearing this talk by Rich Hickey

The main crawler is something like this:

pub async fn crawl_with_depth(
    origin_url: Link,
    crawl_depth: usize,
    whitelist: Option<HashSet<String>>,
    blacklist: Option<HashSet<String>>,
    tx_output: mpsc::Sender<Link>,
    task_limit: usize,
) {
    let mut to_crawl: HashSet<Url> = HashSet::new();
    let mut crawled: HashSet<Url> = HashSet::new();
    let mut dont_crawl: HashSet<Url> = HashSet::new();
    let client = reqwest::Client::new();

    to_crawl.insert(origin_url.url);

    for _ in 0..crawl_depth {
        println!("Crawling {} URls", to_crawl.len());

        let (tx_cralwer, mut rx_crawler) = mpsc::channel::<Link>(task_limit);

        stream::iter(to_crawl.clone())
            .for_each(|x| async {
                let tx_clone = tx_cralwer.clone();
                let client_clone = client.clone();
                tokio::spawn(async move { crawl_page(x, client_clone, tx_clone).await });
            })
            .await;

        to_crawl.clear();

        drop(tx_cralwer);

        while let Some(link) = rx_crawler.recv().await {
            if link.crawled {
                crawled.insert(link.url.clone());
                /// tx channel basically is for io handling and stuff in a separate task.
                if let Err(_) = tx_output.send(link).await {
                    return;
                }
            } else {
                let should_crawl = link.should_crawl(&whitelist, &blacklist);
                if should_crawl && !crawled.contains(&link.url) {
                    to_crawl.insert(link.url);
                } else if !should_crawl && !dont_crawl.contains(&link.url) {
                    dont_crawl.insert(link.url.clone());
                    if let Err(_) = tx_output.send(link).await {
                        return;
                    }
                }
            }
        }
    }
}

The whole program can be found here if someone needs it.

Ayush Singh
  • 1,409
  • 3
  • 19
  • 31
  • "So I am just wondering what might be a good way to add a limit to tasks spawned?" why? Tasks are lightweight concurrency mechanisms, they don't do any parallelism in and of themselves, the parallelism is handled by the runtime: tasks get run on a scheduler, and by default Tokio spawns one scheduler per (virtual) core. If you want to do all that by hand, you should not bother with Tokio and should just manage a threadpool. – Masklinn Mar 06 '21 at 12:37
  • 1
    You also seem to have some misunderstandings about the async system e.g. your `for_each_concurrent` doesn't actually do anything useful, since the closure has no yield points. – Masklinn Mar 06 '21 at 12:38
  • If your goal is to avoid overloading the crawled site with an insane number of requests, then [a semaphore](https://tokio-rs.github.io/tokio/doc/tokio/sync/struct.Semaphore.html) is probably what you want: configure the semaphore with however many concurrent requests are acceptable, then have the tasks acquire the semaphore before performing a request and drop the guard afterwards. – Masklinn Mar 06 '21 at 12:44
  • Well, the idea of using a semaphore is good, but my main objective is to limit how many tasks are spawned. Eg: I have 1M+ different URLs or some arbitrarily large number and I spawn a new task for each url. I understand that tokio tasks are lightweight and it uses M:N threading, but I think keeping an upper limit is required like given in the parallel part of the answer [here](https://stackoverflow.com/questions/51044467/how-can-i-perform-parallel-asynchronous-http-get-requests-with-reqwest). – Ayush Singh Mar 06 '21 at 12:48
  • Yeah, I realized that for_each_concurrent isn't doing anything and I have replaced it in the local code. Thanks for that. – Ayush Singh Mar 06 '21 at 12:50
  • @Masklinn Also limiting also helps minimize how many sites are being processed in parallel which reduces the memory usage. I am not sure but there might be a point of diminishing return in spawning. – Ayush Singh Mar 06 '21 at 12:54
  • If you are dealing with a truly huge (or unbounded) number of requests, then a semaphore is not optimal. In that case I'd create a _fixed_ number of tasks that reflect the number of parallel requests you're after, and would use an async channel to feed those tasks urls to the tasks. Mutable hash sets shouldn't be an issue, just use a `Mutex` - the time the mutex is held when it needs to be briefly updated will be dwarfed by the time it takes to download stuff off the network by many orders of magnitued. The nice thing about Rust is that it won't let you to "forget" to lock the mutex. – user4815162342 Mar 08 '21 at 08:57
  • @user4815162342 That seems like one possible solution. I will try it out later. – Ayush Singh Mar 08 '21 at 18:34

0 Answers0