I am experimenting with tokio runtime in rust and making a web crawler.
Right now, basically, I am spawning a separate task for each link to crawl since parallelism seems to be better than simple concurrency after a limit. All the communication is being done through channels, and there is no function return. So I am just wondering what might be a good way to add a limit to tasks spawned?
I was using the method given in this answer when I was using function returns, but it simply does not seem too useful with channels.
I also thought about simply taking a limited number of to_crawl links but that makes it difficult to keep track of depth.
I can't just move the channel receiving part to a separate task since I am currently using mutable HashSets for keeping track of links and that might get messy with mutex locking and unlocking.
Anyway, I would also love to hear other peoples thoughts over whether channels should even be used over function returns in a program like this coz I pretty much decided to do that on a whim after hearing this talk by Rich Hickey
The main crawler is something like this:
pub async fn crawl_with_depth(
origin_url: Link,
crawl_depth: usize,
whitelist: Option<HashSet<String>>,
blacklist: Option<HashSet<String>>,
tx_output: mpsc::Sender<Link>,
task_limit: usize,
) {
let mut to_crawl: HashSet<Url> = HashSet::new();
let mut crawled: HashSet<Url> = HashSet::new();
let mut dont_crawl: HashSet<Url> = HashSet::new();
let client = reqwest::Client::new();
to_crawl.insert(origin_url.url);
for _ in 0..crawl_depth {
println!("Crawling {} URls", to_crawl.len());
let (tx_cralwer, mut rx_crawler) = mpsc::channel::<Link>(task_limit);
stream::iter(to_crawl.clone())
.for_each(|x| async {
let tx_clone = tx_cralwer.clone();
let client_clone = client.clone();
tokio::spawn(async move { crawl_page(x, client_clone, tx_clone).await });
})
.await;
to_crawl.clear();
drop(tx_cralwer);
while let Some(link) = rx_crawler.recv().await {
if link.crawled {
crawled.insert(link.url.clone());
/// tx channel basically is for io handling and stuff in a separate task.
if let Err(_) = tx_output.send(link).await {
return;
}
} else {
let should_crawl = link.should_crawl(&whitelist, &blacklist);
if should_crawl && !crawled.contains(&link.url) {
to_crawl.insert(link.url);
} else if !should_crawl && !dont_crawl.contains(&link.url) {
dont_crawl.insert(link.url.clone());
if let Err(_) = tx_output.send(link).await {
return;
}
}
}
}
}
}
The whole program can be found here if someone needs it.