1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
use itertools::Itertools;
use reqwest::blocking::{Client, Response};
use serde::Serialize;
use url::Url;
#[tokio::main]
async fn main() {
println!("Hello, world! Im the crawler!");
let root_urls = include_str!("../top-1000-websites.txt");
let root_urls = root_urls.split('\n').collect();
let http_client = reqwest::blocking::Client::new();
crawler(http_client, root_urls).await;
}
//TODO: crawling depth? - async http client
async fn crawler(http_client: Client, root_urls: Vec<&str>) {
dbg!("Starting to crawl!");
//add root urls to queue - TODO: max q size
let (tx_crawling_queue, rx_crawling_queue) = async_channel::bounded::<String>(2222);
for url in root_urls {
tx_crawling_queue.send(String::from(url)).await.unwrap();
}
//and start crawling
loop {
//even if we clone, the underlying queue implementation is still shared
let tx_crawling_queue = tx_crawling_queue.clone();
let rx_crawling_queue = rx_crawling_queue.clone();
//blocks - we move it up here as to at least block for next url and not endesly spawn tasks
let url = rx_crawling_queue.recv().await.unwrap();
let http_client = http_client.clone();
tokio::spawn(async move {
let crawl_res = crawl_url(&http_client, url.as_str()).await;
if crawl_res.is_err() {
dbg!("Error crawling {}", url);
return;
}
let (content, crawled_urls) = crawl_res.unwrap();
//dbg!("Content: {:?}", &content);
dbg!("Next urls: {:?}", &crawled_urls);
//push content to index
let indexer_res = push_crawl_entry_to_indexer(
&http_client,
String::from("http://127.0.0.1:4444/resource"),
url,
content,
)
.await
.unwrap()
.text();
dbg!("Pushed to indexer {:?}", &indexer_res);
for url in crawled_urls {
tx_crawling_queue.send(url).await.unwrap();
}
});
}
}
async fn crawl_url(http_client: &Client, url: &str) -> Result<(String, Vec<String>), String> {
dbg!("Crawling {:?}", url);
let url = Url::parse(url).unwrap();
let response_res = http_client.get(url.as_str()).send();
if response_res.is_err() {
return Err("Error fetching ".to_owned() + url.as_str());
}
let response_text_res = response_res.unwrap().text();
if response_text_res.is_err() {
return Err("Error unwrapping the fetched HTML's text (".to_owned() + url.as_str() + ")");
}
let response_text = response_text_res.unwrap();
let document = scraper::Html::parse_document(response_text.as_str());
let valid_url = |check_url: &Url| match check_url {
u if !(u.scheme() == "http" || u.scheme() == "https") => false,
u if u.fragment().is_some() => false, //no # urls
u if u.query().is_some() => false, //no ? urls
u if *u == url => false, //no same url
_ => true,
};
let link_selector = scraper::Selector::parse("a").unwrap();
let next_urls = document
.select(&link_selector)
.filter_map(|link| link.value().attr("href"))
.unique()
.map(|u| url.join(u))
.filter(Result::is_ok)
.map(Result::unwrap)
.filter(valid_url)
.take(2)
.map(String::from)
.collect();
//normalise words somewhere
//fuzzy? - iterate over keys
//probs lots of places where we can borrow or not do stupid stuff
//search for phrases?
//why multiple '/' at the end of sites"
//http workings lagging behind crawler, what to do?
//group responses and transmit them in an array of 10 or smth -> or maybe just lower q size
//use structs in database indexer
//we need words priority or word list or smth (or in value of database show number of occurance or just val of importance of occurances)
//i dont understand dbg! (how to print {})
//is there empty urls?
//do proper matches instead of unwraps
println!("Returning next urls, {:?}", next_urls);
Ok((response_text, next_urls))
}
async fn push_crawl_entry_to_indexer(
http_client: &Client,
indexer_url: String,
url: String,
content: String,
) -> Result<Response, String> {
dbg!("Pushin to indexer");
#[derive(Serialize, Debug)]
struct Resource {
url: String,
content: String,
}
let request_body = Resource { url, content };
let response_res = http_client.post(&indexer_url).json(&request_body).send();
if response_res.is_err() {
return Err(format!(
"Error pushing the crawler to indexer! {:?}",
&indexer_url
));
}
Ok(response_res.unwrap())
}
|