about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBaitinq <manuelpalenzuelamerino@gmail.com>2022-10-23 12:05:16 +0200
committerBaitinq <manuelpalenzuelamerino@gmail.com>2022-10-23 12:05:19 +0200
commit37414fda51b18096a1834edf2ea0fe53191325e3 (patch)
treef3628e87d22115f68f7ce7ee5f499244a29c5203
parentIndexer: Listen on 0.0.0.0 (diff)
downloadOSSE-37414fda51b18096a1834edf2ea0fe53191325e3.tar.gz
OSSE-37414fda51b18096a1834edf2ea0fe53191325e3.tar.bz2
OSSE-37414fda51b18096a1834edf2ea0fe53191325e3.zip
Crawler: Change blockingqueue to channels
We now use the async-channel channels implementation. This allows us to
have bounded async channels.
-rw-r--r--Cargo.lock40
-rw-r--r--crawler/Cargo.toml2
-rw-r--r--crawler/src/main.rs22
3 files changed, 45 insertions, 19 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 6ed1be4..8a36c19 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -223,6 +223,17 @@ dependencies = [
 ]
 
 [[package]]
+name = "async-channel"
+version = "1.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e14485364214912d3b19cc3435dde4df66065127f05fa0d75c712f36f12c2f28"
+dependencies = [
+ "concurrent-queue",
+ "event-listener",
+ "futures-core",
+]
+
+[[package]]
 name = "autocfg"
 version = "1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -250,12 +261,6 @@ dependencies = [
 ]
 
 [[package]]
-name = "blockingqueue"
-version = "0.1.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "763bb38f196c65b369c68796e6d15b83d7eb63c354f2ab7c03c13edd13f2c4ff"
-
-[[package]]
 name = "brotli"
 version = "3.3.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -310,6 +315,12 @@ dependencies = [
 ]
 
 [[package]]
+name = "cache-padded"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c"
+
+[[package]]
 name = "cc"
 version = "1.0.73"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -335,6 +346,15 @@ name = "client"
 version = "0.1.0"
 
 [[package]]
+name = "concurrent-queue"
+version = "1.2.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "af4780a44ab5696ea9e28294517f1fffb421a83a25af521333c838635509db9c"
+dependencies = [
+ "cache-padded",
+]
+
+[[package]]
 name = "convert_case"
 version = "0.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -380,7 +400,7 @@ dependencies = [
 name = "crawler"
 version = "0.1.0"
 dependencies = [
- "blockingqueue",
+ "async-channel",
  "itertools",
  "reqwest",
  "scraper",
@@ -494,6 +514,12 @@ dependencies = [
 ]
 
 [[package]]
+name = "event-listener"
+version = "2.5.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
+
+[[package]]
 name = "fastrand"
 version = "1.8.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/crawler/Cargo.toml b/crawler/Cargo.toml
index cd828ad..1bf6bc9 100644
--- a/crawler/Cargo.toml
+++ b/crawler/Cargo.toml
@@ -6,12 +6,12 @@ edition = "2021"
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
-blockingqueue = "0.1.1"
 reqwest = {version = "0.11", features = ["blocking", "json"]}
 scraper = "0.12.0"
 itertools = "0.10.5"
 serde = { version = "1.0", features = ["derive"] }
 tokio = { version = "0.2.22", features = ["full"] }
+async-channel = "1.7.1"
 
 [[bin]]
 name = "crawler"
diff --git a/crawler/src/main.rs b/crawler/src/main.rs
index fdb6623..f8dc226 100644
--- a/crawler/src/main.rs
+++ b/crawler/src/main.rs
@@ -19,20 +19,20 @@ async fn crawler(http_client: Client, root_urls: Vec<&str>) {
     println!("Starting to crawl!");
 
     //add root urls to queue - TODO: max q size
-    let crawling_queue: blockingqueue::BlockingQueue<String> = blockingqueue::BlockingQueue::new();
-    root_urls
-        .into_iter()
-        .for_each(|u| crawling_queue.push(String::from(u)));
+    let (tx_crawling_queue, rx_crawling_queue) = async_channel::bounded::<String>(4444);
+    for url in root_urls {
+        tx_crawling_queue.send(String::from(url)).await.unwrap();
+    }
 
     //and start crawling
     loop {
         //even if we clone, the underlying queue implementation is still shared
-        let crawling_queue = crawling_queue.clone();
+        let tx_crawling_queue = tx_crawling_queue.clone();
+        let rx_crawling_queue = rx_crawling_queue.clone();
+        //blocks - we move it up here as to at least block for next url and not endesly spawn tasks
+        let url = rx_crawling_queue.recv().await.unwrap();
         let http_client = http_client.clone();
         tokio::spawn(async move {
-            //blocks
-            let url = crawling_queue.pop();
-
             let crawl_res = crawl_url(&http_client, url.as_str()).await;
             if crawl_res.is_err() {
                 println!("Error crawling {}", url);
@@ -57,9 +57,9 @@ async fn crawler(http_client: Client, root_urls: Vec<&str>) {
 
             println!("Pushed to indexer {:?}", &indexer_res);
 
-            crawled_urls
-                .into_iter()
-                .for_each(|u| crawling_queue.push(u));
+            for url in crawled_urls {
+                tx_crawling_queue.send(url).await.unwrap();
+            }
         });
     }
 }