Skip to content

Instantly share code, notes, and snippets.

@jiaqi-yin
Last active May 10, 2018 11:39
Show Gist options
  • Save jiaqi-yin/0c9ebd89f995333d4b4e5439af0b8196 to your computer and use it in GitHub Desktop.
Save jiaqi-yin/0c9ebd89f995333d4b4e5439af0b8196 to your computer and use it in GitHub Desktop.
Python Crawling
import urllib.request
from urllib.parse import urljoin
from urllib.parse import urlparse
from datetime import datetime
import time
import re
def download(url, user_agent='wsp', num_tries=2):
print('Downloading: ', url)
headers = {'User-agent': user_agent}
request = urllib.request.Request(url, headers=headers)
try:
response = urllib.request.urlopen(request).read()
except urllib.error.URLError as e:
print('Download error: ', e.reason)
response = None
if num_tries > 0:
if hasattr(e, 'code') and 500 <= e.code < 600:
# Recursively retry 5xx HTTP errors
return download(url, num_tries-1)
return response.decode('utf-8')
def crawl_sitemap(url):
# Download the sitemap file
sitemap = download(url)
# Extract the sitemap links
links = re.findall('<loc>(.*?)</loc>', sitemap)
# Download each link
for link in links:
html = download(link)
# TODO: scrape html here
pass
def link_crawler(seed_url, link_regex, delay=3, max_depth=-1):
crawl_queue = [seed_url]
seen = {seed_url: 0}
throttle = Throttle(delay)
while crawl_queue:
url = crawl_queue.pop()
throttle.wait(url)
html = download(url)
depth = seen[url]
if depth != max_depth:
for link in get_links(html):
if re.findall(link_regex, link):
link = urljoin(seed_url, link)
if link not in seen:
seen[link] = depth + 1
crawl_queue.append(link)
def get_links(html):
webpage_regex = re.compile(r'<a[^>]+href=["\'"](.*?)["\'"]', re.IGNORECASE)
return webpage_regex.findall(html)
class Throttle:
def __init__(self, delay):
self.delay = delay
self.domains = {}
def wait(self, url):
domain = urlparse(url).netloc
last_accessed = self.domains.get(domain)
if self.delay > 0 and last_accessed is not None:
sleep_secs = self.delay - (datetime.now() - last_accessed).seconds
if sleep_secs > 0:
time.sleep(sleep_secs)
self.domains[domain] = datetime.now()
link_crawler('http://example.webscraping.com', '/(index|view)', delay=1, max_depth=2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment