about summary refs log tree commit diff
path: root/crawler/src/main.rs
blob: 6161578521a4c410c389f5d1b3fdaebdbaeae986 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
use itertools::Itertools;
use reqwest::blocking::{Client, Response};
use serde::Serialize;

#[tokio::main]
async fn main() {
    println!("Hello, world! Im the crawler!");

    let root_urls = include_str!("../top-1000-websites.txt");
    let root_urls = root_urls.split('\n').collect();

    let http_client = reqwest::blocking::Client::new();

    crawler(http_client, root_urls).await;
}

//TODO: crawling depth? - async http client
async fn crawler(http_client: Client, root_urls: Vec<&str>) {
    dbg!("Starting to crawl!");

    //add root urls to queue - TODO: max q size
    let (tx_crawling_queue, rx_crawling_queue) = async_channel::bounded::<String>(4444);
    for url in root_urls {
        tx_crawling_queue.send(String::from(url)).await.unwrap();
    }

    //and start crawling
    loop {
        //even if we clone, the underlying queue implementation is still shared
        let tx_crawling_queue = tx_crawling_queue.clone();
        let rx_crawling_queue = rx_crawling_queue.clone();
        //blocks - we move it up here as to at least block for next url and not endesly spawn tasks
        let url = rx_crawling_queue.recv().await.unwrap();
        let http_client = http_client.clone();
        tokio::spawn(async move {
            let crawl_res = crawl_url(&http_client, url.as_str()).await;
            if crawl_res.is_err() {
                dbg!("Error crawling {}", url);
                return;
            }

            let (content, crawled_urls) = crawl_res.unwrap();

            //dbg!("Content: {:?}", &content);
            dbg!("Next urls: {:?}", &crawled_urls);

            //push content to index
            let indexer_res = push_crawl_entry_to_indexer(
                &http_client,
                String::from("http://127.0.0.1:4444/resource"),
                url,
                content,
            )
            .await
            .unwrap()
            .text();

            dbg!("Pushed to indexer {:?}", &indexer_res);

            for url in crawled_urls {
                tx_crawling_queue.send(url).await.unwrap();
            }
        });
    }
}

async fn crawl_url(http_client: &Client, url: &str) -> Result<(String, Vec<String>), String> {
    dbg!("Crawling {:?}", url);

    let response_res = http_client.get(url).send();
    if response_res.is_err() {
        return Err("Error fetching ".to_owned() + url);
    }
    let response_text_res = response_res.unwrap().text();
    if response_text_res.is_err() {
        return Err("Error unwrapping the fetched HTML's text (".to_owned() + url + ")");
    }

    let response_text = response_text_res.unwrap();
    let document = scraper::Html::parse_document(response_text.as_str());

    let link_selector = scraper::Selector::parse("a").unwrap();
    let next_urls = document
        .select(&link_selector)
        .filter_map(|link| link.value().attr("href"))
        .unique()
        .take(2)
        .map(String::from)
        .collect();

    //we need to not append http if already has it
    let fixup_urls = |us: Vec<String>| {
        us.into_iter()
            .map(|u| {
                //https://stackoverflow.com/questions/9646407/two-forward-slashes-in-a-url-src-href-attribute
                if u.starts_with("//") {
                    format!("https:{}", &u)
                } else if u.starts_with('/') {
                    format!("{}{}", &url, &u)
                } else {
                    u
                }
            })
            .collect()
    };

    let next_urls = fixup_urls(next_urls);
    //limit to 2 or smth for ram? or depth
    //normalise words somewhere
    //fuzzy?
    //probs lots of places where we can borrow or not do stupid stuff
    //search for phrases?
    //why multiple '/' at the end of sites"

    Ok((response_text, next_urls))
}

async fn push_crawl_entry_to_indexer(
    http_client: &Client,
    indexer_url: String,
    url: String,
    content: String,
) -> Result<Response, String> {
    dbg!("Pushin to indexer");

    #[derive(Serialize, Debug)]
    struct Resource {
        url: String,
        content: String,
    }

    let request_body = Resource { url, content };

    let response_res = http_client.post(&indexer_url).json(&request_body).send();
    if response_res.is_err() {
        return Err(format!(
            "Error pushing the crawler to indexer! {:?}",
            &indexer_url
        ));
    }

    Ok(response_res.unwrap())
}