From f1971eb673c55afe9836484e91715200410af5bb Mon Sep 17 00:00:00 2001 From: Baitinq Date: Fri, 21 Oct 2022 21:16:54 +0200 Subject: Crawler: Implement basic async functionality --- crawler/Cargo.toml | 1 + crawler/src/main.rs | 84 ++++++++++++++++++++++++++++------------------------- 2 files changed, 46 insertions(+), 39 deletions(-) (limited to 'crawler') diff --git a/crawler/Cargo.toml b/crawler/Cargo.toml index 486729a..cd828ad 100644 --- a/crawler/Cargo.toml +++ b/crawler/Cargo.toml @@ -11,6 +11,7 @@ reqwest = {version = "0.11", features = ["blocking", "json"]} scraper = "0.12.0" itertools = "0.10.5" serde = { version = "1.0", features = ["derive"] } +tokio = { version = "0.2.22", features = ["full"] } [[bin]] name = "crawler" diff --git a/crawler/src/main.rs b/crawler/src/main.rs index c086a76..fdb6623 100644 --- a/crawler/src/main.rs +++ b/crawler/src/main.rs @@ -2,7 +2,8 @@ use itertools::Itertools; use reqwest::blocking::{Client, Response}; use serde::Serialize; -fn main() { +#[tokio::main] +async fn main() { println!("Hello, world! Im the crawler!"); let root_urls = include_str!("../top-1000-websites.txt"); @@ -10,55 +11,60 @@ fn main() { let http_client = reqwest::blocking::Client::new(); - crawler(&http_client, root_urls); + crawler(http_client, root_urls).await; } -//takes list of strings - multithread here? -fn crawler(http_client: &Client, root_urls: Vec<&str>) { +//TODO: crawling depth? - async http client +async fn crawler(http_client: Client, root_urls: Vec<&str>) { println!("Starting to crawl!"); //add root urls to queue - TODO: max q size let crawling_queue: blockingqueue::BlockingQueue = blockingqueue::BlockingQueue::new(); - for url in root_urls { - crawling_queue.push(String::from(url)); - } + root_urls + .into_iter() + .for_each(|u| crawling_queue.push(String::from(u))); //and start crawling - //FIXME: Async! loop { - //blocks - let url = crawling_queue.pop(); - - let crawl_res = crawl_url(http_client, url.as_str()); - if crawl_res.is_err() { - println!("Error crawling {}", url); - continue; - } - - let (content, crawled_urls) = crawl_res.unwrap(); - - //println!("Content: {:?}", content); - println!("Next urls: {:?}", crawled_urls); - - //push content to index - let indexer_res = push_crawl_entry_to_indexer( - http_client, - String::from("http://127.0.0.1:4444/resource"), - url, - content, - ) - .unwrap() - .text(); - - println!("Pushed to indexer {:?}", &indexer_res); - - for url in crawled_urls { - crawling_queue.push(url); - } + //even if we clone, the underlying queue implementation is still shared + let crawling_queue = crawling_queue.clone(); + let http_client = http_client.clone(); + tokio::spawn(async move { + //blocks + let url = crawling_queue.pop(); + + let crawl_res = crawl_url(&http_client, url.as_str()).await; + if crawl_res.is_err() { + println!("Error crawling {}", url); + return; + } + + let (content, crawled_urls) = crawl_res.unwrap(); + + //println!("Content: {:?}", content); + println!("Next urls: {:?}", crawled_urls); + + //push content to index + let indexer_res = push_crawl_entry_to_indexer( + &http_client, + String::from("http://127.0.0.1:4444/resource"), + url, + content, + ) + .await + .unwrap() + .text(); + + println!("Pushed to indexer {:?}", &indexer_res); + + crawled_urls + .into_iter() + .for_each(|u| crawling_queue.push(u)); + }); } } -fn crawl_url(http_client: &Client, url: &str) -> Result<(String, Vec), String> { +async fn crawl_url(http_client: &Client, url: &str) -> Result<(String, Vec), String> { let url = "https://".to_owned() + url; println!("Crawling {:?}", url); @@ -103,7 +109,7 @@ fn crawl_url(http_client: &Client, url: &str) -> Result<(String, Vec), S Ok((response_text, next_urls)) } -fn push_crawl_entry_to_indexer( +async fn push_crawl_entry_to_indexer( http_client: &Client, indexer_url: String, url: String, -- cgit 1.4.1