about summary refs log tree commit diff
path: root/crawler
diff options
context:
space:
mode:
authorBaitinq <manuelpalenzuelamerino@gmail.com>2022-10-21 21:16:54 +0200
committerBaitinq <manuelpalenzuelamerino@gmail.com>2022-10-22 02:30:50 +0200
commitf1971eb673c55afe9836484e91715200410af5bb (patch)
tree4bba48436b89a140461e579be77a43f6dde9a70a /crawler
parentCrawler: Add basic indexer communication (diff)
downloadOSSE-f1971eb673c55afe9836484e91715200410af5bb.tar.gz
OSSE-f1971eb673c55afe9836484e91715200410af5bb.tar.bz2
OSSE-f1971eb673c55afe9836484e91715200410af5bb.zip
Crawler: Implement basic async functionality
Diffstat (limited to 'crawler')
-rw-r--r--crawler/Cargo.toml1
-rw-r--r--crawler/src/main.rs84
2 files changed, 46 insertions, 39 deletions
diff --git a/crawler/Cargo.toml b/crawler/Cargo.toml
index 486729a..cd828ad 100644
--- a/crawler/Cargo.toml
+++ b/crawler/Cargo.toml
@@ -11,6 +11,7 @@ reqwest = {version = "0.11", features = ["blocking", "json"]}
 scraper = "0.12.0"
 itertools = "0.10.5"
 serde = { version = "1.0", features = ["derive"] }
+tokio = { version = "0.2.22", features = ["full"] }
 
 [[bin]]
 name = "crawler"
diff --git a/crawler/src/main.rs b/crawler/src/main.rs
index c086a76..fdb6623 100644
--- a/crawler/src/main.rs
+++ b/crawler/src/main.rs
@@ -2,7 +2,8 @@ use itertools::Itertools;
 use reqwest::blocking::{Client, Response};
 use serde::Serialize;
 
-fn main() {
+#[tokio::main]
+async fn main() {
     println!("Hello, world! Im the crawler!");
 
     let root_urls = include_str!("../top-1000-websites.txt");
@@ -10,55 +11,60 @@ fn main() {
 
     let http_client = reqwest::blocking::Client::new();
 
-    crawler(&http_client, root_urls);
+    crawler(http_client, root_urls).await;
 }
 
-//takes list of strings - multithread here?
-fn crawler(http_client: &Client, root_urls: Vec<&str>) {
+//TODO: crawling depth? - async http client
+async fn crawler(http_client: Client, root_urls: Vec<&str>) {
     println!("Starting to crawl!");
 
     //add root urls to queue - TODO: max q size
     let crawling_queue: blockingqueue::BlockingQueue<String> = blockingqueue::BlockingQueue::new();
-    for url in root_urls {
-        crawling_queue.push(String::from(url));
-    }
+    root_urls
+        .into_iter()
+        .for_each(|u| crawling_queue.push(String::from(u)));
 
     //and start crawling
-    //FIXME: Async!
     loop {
-        //blocks
-        let url = crawling_queue.pop();
-
-        let crawl_res = crawl_url(http_client, url.as_str());
-        if crawl_res.is_err() {
-            println!("Error crawling {}", url);
-            continue;
-        }
-
-        let (content, crawled_urls) = crawl_res.unwrap();
-
-        //println!("Content: {:?}", content);
-        println!("Next urls: {:?}", crawled_urls);
-
-        //push content to index
-        let indexer_res = push_crawl_entry_to_indexer(
-            http_client,
-            String::from("http://127.0.0.1:4444/resource"),
-            url,
-            content,
-        )
-        .unwrap()
-        .text();
-
-        println!("Pushed to indexer {:?}", &indexer_res);
-
-        for url in crawled_urls {
-            crawling_queue.push(url);
-        }
+        //even if we clone, the underlying queue implementation is still shared
+        let crawling_queue = crawling_queue.clone();
+        let http_client = http_client.clone();
+        tokio::spawn(async move {
+            //blocks
+            let url = crawling_queue.pop();
+
+            let crawl_res = crawl_url(&http_client, url.as_str()).await;
+            if crawl_res.is_err() {
+                println!("Error crawling {}", url);
+                return;
+            }
+
+            let (content, crawled_urls) = crawl_res.unwrap();
+
+            //println!("Content: {:?}", content);
+            println!("Next urls: {:?}", crawled_urls);
+
+            //push content to index
+            let indexer_res = push_crawl_entry_to_indexer(
+                &http_client,
+                String::from("http://127.0.0.1:4444/resource"),
+                url,
+                content,
+            )
+            .await
+            .unwrap()
+            .text();
+
+            println!("Pushed to indexer {:?}", &indexer_res);
+
+            crawled_urls
+                .into_iter()
+                .for_each(|u| crawling_queue.push(u));
+        });
     }
 }
 
-fn crawl_url(http_client: &Client, url: &str) -> Result<(String, Vec<String>), String> {
+async fn crawl_url(http_client: &Client, url: &str) -> Result<(String, Vec<String>), String> {
     let url = "https://".to_owned() + url;
 
     println!("Crawling {:?}", url);
@@ -103,7 +109,7 @@ fn crawl_url(http_client: &Client, url: &str) -> Result<(String, Vec<String>), S
     Ok((response_text, next_urls))
 }
 
-fn push_crawl_entry_to_indexer(
+async fn push_crawl_entry_to_indexer(
     http_client: &Client,
     indexer_url: String,
     url: String,