Created
January 17, 2018 03:24
-
-
Save georgeyjm/7eccd9f7edf956670c2dc03eb474e182 to your computer and use it in GitHub Desktop.
A web crawler for IGCSE past papers on PapaCambridge.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from bs4 import BeautifulSoup | |
from threading import Thread | |
import requests | |
import re | |
import shutil | |
import os | |
import time | |
MAX_THREAD = 30 | |
ROOT_DIR_NAME = 'PapaCambridge' | |
ROOT_URL = 'http://pastpapers.papacambridge.com/' | |
LOG_LEVELS = {0: 'DEBUG', 1: 'INFO', 2: 'WARNING', 3: 'ERROR', 4: 'CRITICAL'} | |
LOG_COLORS = {'DEBUG': '', 'INFO': '\033[36m', 'WARNING': '\033[93m', 'ERROR': '\033[91m', 'CRITICAL': '\u001b[48;5;9m'} | |
def log(level, msg, criticalPause=True): | |
currentTime = time.strftime('%Y-%m-%d %H:%M:%S') | |
level = LOG_LEVELS.get(level, level) | |
logMsg = '{} {}\033[1m[{}]\033[0m {}' | |
print(logMsg.format(currentTime, LOG_COLORS.get(level, ''), level, msg)) | |
if level == 'CRITICAL': | |
input('A critical error occurred, the program has paused, press enter to continue.') | |
def getUrl(url, **options): | |
try: | |
web = requests.get(url, **options) | |
except requests.packages.urllib3.exceptions.ReadTimeoutError: | |
log(3, 'Timeout, url: {}'.format(url)) | |
return -1 | |
except requests.exceptions.ConnectionError: | |
log(3, 'Connection error, url: {}'.format(url)) | |
return -1 | |
except Exception as e: | |
log(3, 'Uncaught exception ({}): {} when requesting url: {}'.format(e.__class__.__name__, e, url)) | |
return -1 | |
else: | |
return web | |
def getCourseUrls(*courses): | |
courses = list(courses) | |
# regex = re.compile(r'(?:.* \((\d{4})\))|(?:.* - (\d{4}))') | |
regex = re.compile(r'\d{4}') | |
url = ROOT_URL + '?dir=Cambridge%20International%20Examinations%20%28CIE%29/IGCSE' | |
web = getUrl(url) | |
if web == -1: | |
exit() | |
soup = BeautifulSoup(web.text, 'lxml') | |
for courseTitle in soup.select('span.file-name'): | |
courseId = regex.findall(courseTitle.get_text()) | |
if courseId and courseId[0] in courses: | |
courses.remove(courseId[0]) | |
yield courseTitle.get_text().strip(), ROOT_URL + courseTitle.parent.parent.get('href') | |
if courses: | |
log(2, 'Course number(s) not found: {}'.format(', '.join(courses))) | |
def getPaperUrls(courseUrl, crawlFileTypes=('qp', 'ms', 'in', 'pre')): | |
regex = re.compile('[0-9]{{4}}_[swm][0-9]{{1,2}}_({})_[0-9]{{1,2}}\.pdf'.format('|'.join(crawlFileTypes))) | |
courseWeb = getUrl(courseUrl) | |
if courseWeb == -1: | |
return -1 | |
courseSoup = BeautifulSoup(courseWeb.text, 'lxml') | |
for folder in courseSoup.select('span.file-name'): | |
if folder.get_text().strip() == '..': | |
continue | |
web = getUrl(ROOT_URL + folder.parent.parent.get('href')) | |
if web == -1: | |
return -1 | |
soup = BeautifulSoup(web.text, 'lxml') | |
for file in soup.select('span.file-name'): | |
if regex.findall(file.get_text()): | |
yield ROOT_URL + file.parent.parent.get('href').replace('view.php?id=', '') | |
def downloadFile(url, dirName): | |
name = url.split('/')[-1] | |
req = getUrl(url, timeout=60, stream=True) | |
if req == -1: | |
return -1 | |
if req.status_code == 200: | |
with open(os.path.join(ROOT_DIR_NAME, dirName, name), 'wb') as file: | |
try: | |
req.raw.decode_content = True | |
shutil.copyfileobj(req.raw, file) | |
except Exception as e: | |
log(3, 'Uncaught exception ({}): {} when copying file object: {}'.format(e.__class__.__name__, e, name)) | |
return -1 | |
else: | |
log(3, 'Error response [{}], url: {}'.format(req.status_code, url)) | |
return -1 | |
try: | |
courses = input('Input the course numbers, separate using spaces: ') | |
log(1, 'Getting course URLs') | |
if not os.path.isdir(ROOT_DIR_NAME): | |
os.mkdir(ROOT_DIR_NAME) | |
for courseName, courseUrl in getCourseUrls(*courses.split()): | |
log(1, 'Crawling: \033[4m{}\033[0m'.format(courseName)) | |
if not os.path.isdir(os.path.join(ROOT_DIR_NAME, courseName)): | |
os.mkdir(os.path.join(ROOT_DIR_NAME, courseName)) | |
allPapers = [i for i in getPaperUrls(courseUrl)] | |
log(1, 'Downloading {} files'.format(len(allPapers))) | |
threadPool = [] | |
for fileUrl in allPapers: | |
thread = Thread(target=downloadFile, args=(fileUrl, courseName)) | |
threadPool.append(thread) | |
if len(threadPool) == MAX_THREAD: | |
for thread in threadPool: | |
thread.start() | |
thread.join() | |
threadPool = [] | |
for thread in threadPool: | |
thread.start() | |
thread.join() | |
log(1, 'Completed!') | |
except KeyboardInterrupt: | |
log(1, 'Keyboard interrupted') | |
exit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment