Programming Projects Python

An async web crawler from scratch

~6 mins read

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
"""
Create a sitemap by following links and processing concurrently
"""

import asyncio
import aiohttp

import logging
import collections
import urllib.parse

from pprint import pprint
import bs4


class AsyncCrawler:
    """ a concurrent web crawler """

    def __init__(self, max_concurrency=None):
        self.start_url = None
        self.root_netloc = None
        self.session = None

        self.todo = set()
        self.busy = set()
        self.done = set()

        if max_concurrency is None:
            max_concurrency = 400
        self.sem = asyncio.Semaphore(max_concurrency)

        self.timeout = 16  # seconds

        self.sitemap = collections.defaultdict(set)

    async def fetch(self, url):
        async with self.session.get(url) as response:
            if response.status == 200:
                return await response.content.read()

    async def parse(self, data, url):
        soup = bs4.BeautifulSoup(data, features="html.parser")
        links = set(a.get("href") for a in soup.find_all("a", href=True))
        for link in links:
            asyncio.create_task(self.filter_url(link, url))

    async def crawl(self, url):
        self.todo.remove(url)
        self.busy.add(url)
        try:
            data = await self.fetch(url)
            if data:
                await self.parse(data, url)
        except aiohttp.client_exceptions.ClientError as e:
            logging.info(f"{url}, 'has error', {repr(str(e))}")
        finally:
            self.busy.remove(url)
            self.done.add(url)
            print(
                f"{len(self.todo)} todo, {len(self.busy)} pending, {len(self.done)} done"
            )
            self.sem.release()

    async def filter_url(self, url, parent_url):
        """ Crawl all links to a domain and its sub-domains """
        url = urllib.parse.urljoin(parent_url, url)
        url, frag = urllib.parse.urldefrag(url)
        parsed_link = urllib.parse.urlparse(url)
        is_same_domain = self.root_netloc in parsed_link.netloc
        is_relevant_url = (
                is_same_domain
                and url not in self.todo
                and url not in self.busy
                and url not in self.done
        )
        if is_relevant_url:
            self.sitemap[parsed_link.netloc].add(parsed_link.path)
            await self.add_url(url)

    async def add_url(self, url):
        self.todo.add(url)
        await self.sem.acquire()
        asyncio.create_task(self.crawl(url))

    async def run(self):
        timeout = aiohttp.ClientTimeout(total=self.timeout)
        # ClientSession is for connection pooling and HTTP keep-alives
        self.session = aiohttp.ClientSession(timeout=timeout)
        crawl = asyncio.create_task(self.add_url(self.start_url))
        await asyncio.sleep(1)
        while self.busy:
            await asyncio.sleep(1)
        await crawl
        await self.session.close()

    def start(self, start_url):
        loop = asyncio.get_event_loop()
        self.start_url = start_url
        self.root_netloc = urllib.parse.urlparse(start_url).netloc
        loop.run_until_complete(asyncio.gather(self.run()))
        return self.sitemap


def test_crawler():
    start_url = "https://example.com"
    c = AsyncCrawler()
    sitemap = c.start(start_url)
    pprint(sitemap)


if __name__ == "__main__":
    test_crawler()

🎰