From 37414fda51b18096a1834edf2ea0fe53191325e3 Mon Sep 17 00:00:00 2001 From: Baitinq Date: Sun, 23 Oct 2022 12:05:16 +0200 Subject: Crawler: Change blockingqueue to channels We now use the async-channel channels implementation. This allows us to have bounded async channels. --- Cargo.lock | 40 +++++++++++++++++++++++++++++++++------- crawler/Cargo.toml | 2 +- crawler/src/main.rs | 22 +++++++++++----------- 3 files changed, 45 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6ed1be4..8a36c19 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -222,6 +222,17 @@ dependencies = [ "alloc-no-stdlib", ] +[[package]] +name = "async-channel" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e14485364214912d3b19cc3435dde4df66065127f05fa0d75c712f36f12c2f28" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -249,12 +260,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "blockingqueue" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "763bb38f196c65b369c68796e6d15b83d7eb63c354f2ab7c03c13edd13f2c4ff" - [[package]] name = "brotli" version = "3.3.4" @@ -309,6 +314,12 @@ dependencies = [ "bytes 1.2.1", ] +[[package]] +name = "cache-padded" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c" + [[package]] name = "cc" version = "1.0.73" @@ -334,6 +345,15 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" name = "client" version = "0.1.0" +[[package]] +name = "concurrent-queue" +version = "1.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af4780a44ab5696ea9e28294517f1fffb421a83a25af521333c838635509db9c" +dependencies = [ + "cache-padded", +] + [[package]] name = "convert_case" version = "0.4.0" @@ -380,7 +400,7 @@ dependencies = [ name = "crawler" version = "0.1.0" dependencies = [ - "blockingqueue", + "async-channel", "itertools", "reqwest", "scraper", @@ -493,6 +513,12 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "fastrand" version = "1.8.0" diff --git a/crawler/Cargo.toml b/crawler/Cargo.toml index cd828ad..1bf6bc9 100644 --- a/crawler/Cargo.toml +++ b/crawler/Cargo.toml @@ -6,12 +6,12 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -blockingqueue = "0.1.1" reqwest = {version = "0.11", features = ["blocking", "json"]} scraper = "0.12.0" itertools = "0.10.5" serde = { version = "1.0", features = ["derive"] } tokio = { version = "0.2.22", features = ["full"] } +async-channel = "1.7.1" [[bin]] name = "crawler" diff --git a/crawler/src/main.rs b/crawler/src/main.rs index fdb6623..f8dc226 100644 --- a/crawler/src/main.rs +++ b/crawler/src/main.rs @@ -19,20 +19,20 @@ async fn crawler(http_client: Client, root_urls: Vec<&str>) { println!("Starting to crawl!"); //add root urls to queue - TODO: max q size - let crawling_queue: blockingqueue::BlockingQueue = blockingqueue::BlockingQueue::new(); - root_urls - .into_iter() - .for_each(|u| crawling_queue.push(String::from(u))); + let (tx_crawling_queue, rx_crawling_queue) = async_channel::bounded::(4444); + for url in root_urls { + tx_crawling_queue.send(String::from(url)).await.unwrap(); + } //and start crawling loop { //even if we clone, the underlying queue implementation is still shared - let crawling_queue = crawling_queue.clone(); + let tx_crawling_queue = tx_crawling_queue.clone(); + let rx_crawling_queue = rx_crawling_queue.clone(); + //blocks - we move it up here as to at least block for next url and not endesly spawn tasks + let url = rx_crawling_queue.recv().await.unwrap(); let http_client = http_client.clone(); tokio::spawn(async move { - //blocks - let url = crawling_queue.pop(); - let crawl_res = crawl_url(&http_client, url.as_str()).await; if crawl_res.is_err() { println!("Error crawling {}", url); @@ -57,9 +57,9 @@ async fn crawler(http_client: Client, root_urls: Vec<&str>) { println!("Pushed to indexer {:?}", &indexer_res); - crawled_urls - .into_iter() - .for_each(|u| crawling_queue.push(u)); + for url in crawled_urls { + tx_crawling_queue.send(url).await.unwrap(); + } }); } } -- cgit 1.4.1