Skip to content

Instantly share code, notes, and snippets.

@ddjerqq
Created December 13, 2022 22:18
Show Gist options
  • Save ddjerqq/4bdf74b2fc7c822d0cbc862f1186d616 to your computer and use it in GitHub Desktop.
Save ddjerqq/4bdf74b2fc7c822d0cbc862f1186d616 to your computer and use it in GitHub Desktop.
extract and save urls
"""
1. Create a web crawler/scraper that uses socket connections (No Selenium) to gather links from webpages and add
them to a process queue.
2. The queue will be processed by P number of processes (However many cores on the machine).
3. Each process will use aiohttp (Async) with max T number of threads/tasks (Variable default: 100) to scrape from
the queue and add to the queue.
4. Store the title of all scraped HTML pages along with their URLs in an SQLITE database file.
"""
from __future__ import annotations
import re
import asyncio as aio
import multiprocessing as mp
import sqlite3
import urllib.request
import aiohttp
class Crawler:
"""Crawler, a web crawler that uses socker connections to gather links from webpages.
"""
def __init__(self):
# use this or app.db whatever fits your needs
self._conn = sqlite3.connect(":memory:")
self._cursor = self._conn.cursor()
self._create_database()
def _create_database(self) -> None:
"""Create the database.
"""
self._cursor.execute("""
CREATE TABLE IF NOT EXISTS links
(
title TEXT,
url TEXT
);
""")
def add_record_to_database(self, title: str, link: str) -> None:
"""Add a record to the database.
"""
self._cursor.execute("INSERT INTO links VALUES (?, ?)", (title, link))
self._conn.commit()
@staticmethod
async def _process_link(link: str, queue: mp.Queue) -> None:
"""Process a link.
pull all the links from the link as well, and put them inside the queue.
"""
async with aiohttp.ClientSession() as session:
async with session.get(link) as response:
html = await response.text()
title = re.findall(r"<title>(.*?)</title>", html)
queue.put((title, link))
def _fetch_links(self, website: str, process_queue: mp.Queue) -> None:
"""Fetch links from a web address and populate the process queue.
"""
respo = urllib.request.urlopen(website)
html = respo.read().decode("utf-8")
# get the first item from the list. discard the rest.
title = re.findall(r"<title>(.*?)</title>", html)[0]
links = re.findall(r'href=[\'"]?([^\'" >]+)', html)
for link in links:
if link.startswith("http"):
process_queue.put(link)
self.add_record_to_database(title, link)
def process_website(self, website: str) -> list[tuple[str, str]]:
"""Process a website.
"""
q = mp.Queue()
print(f"started processing {website}")
self._fetch_links(website, q)
# get the items from the queue
links = []
while (link := q.get()) is not None:
links.append(link)
print(f"finished processing {website}")
print(f"found {len(links)} links")
print("starting processing links")
# process the links
with mp.Pool(processes=mp.cpu_count()) as pool:
pool.map(lambda lnk: self._process_link(lnk, q), links)
print("finished processing links")
# get the items from the queue
while (title_link := q.get()) is not None:
title, link = title_link
# and add them to the database
self.add_record_to_database(title, link)
all_records = self._cursor.execute("SELECT * FROM links").fetchall()
print(f"found {len(all_records)} records")
return all_records
async def main():
crawler = Crawler()
links = crawler.process_website("https://gist.github.com/")
print(links)
if __name__ == "__main__":
aio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment