diff options
author | Baitinq <manuelpalenzuelamerino@gmail.com> | 2022-10-23 12:05:16 +0200 |
---|---|---|
committer | Baitinq <manuelpalenzuelamerino@gmail.com> | 2022-10-23 12:05:19 +0200 |
commit | 37414fda51b18096a1834edf2ea0fe53191325e3 (patch) | |
tree | f3628e87d22115f68f7ce7ee5f499244a29c5203 /crawler | |
parent | Indexer: Listen on 0.0.0.0 (diff) | |
download | OSSE-37414fda51b18096a1834edf2ea0fe53191325e3.tar.gz OSSE-37414fda51b18096a1834edf2ea0fe53191325e3.tar.bz2 OSSE-37414fda51b18096a1834edf2ea0fe53191325e3.zip |
Crawler: Change blockingqueue to channels
We now use the async-channel channels implementation. This allows us to have bounded async channels.
Diffstat (limited to 'crawler')
-rw-r--r-- | crawler/Cargo.toml | 2 | ||||
-rw-r--r-- | crawler/src/main.rs | 22 |
2 files changed, 12 insertions, 12 deletions
diff --git a/crawler/Cargo.toml b/crawler/Cargo.toml index cd828ad..1bf6bc9 100644 --- a/crawler/Cargo.toml +++ b/crawler/Cargo.toml @@ -6,12 +6,12 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -blockingqueue = "0.1.1" reqwest = {version = "0.11", features = ["blocking", "json"]} scraper = "0.12.0" itertools = "0.10.5" serde = { version = "1.0", features = ["derive"] } tokio = { version = "0.2.22", features = ["full"] } +async-channel = "1.7.1" [[bin]] name = "crawler" diff --git a/crawler/src/main.rs b/crawler/src/main.rs index fdb6623..f8dc226 100644 --- a/crawler/src/main.rs +++ b/crawler/src/main.rs @@ -19,20 +19,20 @@ async fn crawler(http_client: Client, root_urls: Vec<&str>) { println!("Starting to crawl!"); //add root urls to queue - TODO: max q size - let crawling_queue: blockingqueue::BlockingQueue<String> = blockingqueue::BlockingQueue::new(); - root_urls - .into_iter() - .for_each(|u| crawling_queue.push(String::from(u))); + let (tx_crawling_queue, rx_crawling_queue) = async_channel::bounded::<String>(4444); + for url in root_urls { + tx_crawling_queue.send(String::from(url)).await.unwrap(); + } //and start crawling loop { //even if we clone, the underlying queue implementation is still shared - let crawling_queue = crawling_queue.clone(); + let tx_crawling_queue = tx_crawling_queue.clone(); + let rx_crawling_queue = rx_crawling_queue.clone(); + //blocks - we move it up here as to at least block for next url and not endesly spawn tasks + let url = rx_crawling_queue.recv().await.unwrap(); let http_client = http_client.clone(); tokio::spawn(async move { - //blocks - let url = crawling_queue.pop(); - let crawl_res = crawl_url(&http_client, url.as_str()).await; if crawl_res.is_err() { println!("Error crawling {}", url); @@ -57,9 +57,9 @@ async fn crawler(http_client: Client, root_urls: Vec<&str>) { println!("Pushed to indexer {:?}", &indexer_res); - crawled_urls - .into_iter() - .for_each(|u| crawling_queue.push(u)); + for url in crawled_urls { + tx_crawling_queue.send(url).await.unwrap(); + } }); } } |