Created
April 16, 2023 17:50
-
-
Save georgeyjm/a9ec69868af22b23b0cb1f2af9602161 to your computer and use it in GitHub Desktop.
OXAM Crawler.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import shutil | |
from pathlib import Path | |
from queue import Queue, Empty | |
from threading import Thread, Event | |
import requests | |
from bs4 import BeautifulSoup | |
from selenium import webdriver | |
from fuzzywuzzy import fuzz | |
from tqdm import tqdm | |
root_domain = 'https://weblearn.ox.ac.uk' | |
main_url = '/portal/site/:oxam/tool/8a98905b-a664-4618-9200-4ceb2118b0d6/advanced' | |
root_dest_path = Path('OXAM') | |
num_workers = 10 | |
def get_authenticated_session() -> requests.Session: | |
driver = webdriver.Chrome() | |
driver.get(root_domain + main_url) | |
input('Please press Enter after logging in.') | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.34' | |
} | |
sess = requests.session() | |
sess.headers.update(headers) | |
for cookie in driver.get_cookies(): | |
sess.cookies.update({cookie['name']: cookie['value']}) | |
driver.quit() | |
return sess | |
def get_all_courses() -> list: | |
resp = sess.get(root_domain + main_url) | |
soup = BeautifulSoup(resp.text, features='html.parser') | |
return soup.select('#exam > optgroup > option') | |
def match_courses(search_text: str, all_courses: list) -> list: | |
# Preprocess search text | |
search_text = search_text.replace('&', 'and') | |
matched_courses = [] | |
scores = [] | |
name_extractor_re = re.compile(r'(.{4}) {4}(?:(.+)(?:(?: in )|(?: of )))?(.+)') | |
for el in all_courses: | |
value = el.get('value') | |
name = el.text.replace('\xa0', ' ') | |
course_id, course_type, course_name = name_extractor_re.match(name).groups() | |
course_name = course_name.rstrip('.') | |
assert course_id == value | |
score = fuzz.partial_ratio(search_text, course_name.lower().ljust(len(search_text))) # ljust is for padding name, to make sure all tokens in search appear in name | |
if score >= 80: # fuzz.token_sort_ratio, fuzz.token_set_ratio | |
matched_courses.append({'id': course_id, 'type': course_type, 'name': course_name}) | |
scores.append(score) | |
matched_courses = [c for _, c in sorted(zip(scores, matched_courses), key=lambda x: x[0], reverse=True)] | |
return matched_courses | |
def get_course_files(url: str, course: dict): | |
# Recursively paginate and yield all files of a given course URL | |
resp = sess.get(url) | |
soup = BeautifulSoup(resp.text, features='html.parser') | |
for el in soup.select('.content > div > ul > li'): | |
link_el = el.select_one('a') | |
name_els = link_el.select('span') | |
info_els = el.select(':scope > span') # Top-level span only | |
paper_href = link_el.get('href') | |
paper_id = name_els[0].text | |
paper_title = name_els[1].text.rstrip('.') | |
paper_year = info_els[0].text | |
paper_term = info_els[1].text | |
yield { | |
'course': course, | |
'paper_id': paper_id, | |
'title': paper_title, | |
'year': paper_year, | |
'term': paper_term, | |
'href': paper_href, | |
} | |
# Recursively paginate | |
pagination_buttons = soup.select('.content > div > div > a') | |
if pagination_buttons[-1].text != 'Next': | |
return | |
next_url = root_domain + parent_url + pagination_buttons[-1].get('href') | |
for file in get_course_files(next_url, course): | |
yield file | |
def download_file(url: str, dest_path: Path, skip_exist: bool=True): | |
if skip_exist and dest_path.exists(): | |
return | |
# Ensure parent directory exists | |
dest_path.parents[0].mkdir(parents=True, exist_ok=True) | |
try: | |
with sess.get(root_domain + url, stream=True) as resp: | |
resp.raise_for_status() | |
with dest_path.open('wb') as f: | |
shutil.copyfileobj(resp.raw, f) | |
except Exception as e: | |
print(e) | |
print(root_domain + url) | |
return | |
def worker_get_all_course_files(): | |
while not courses_done.is_set(): | |
try: | |
course = courses_queue.get(block=True, timeout=2.0) | |
except Empty: | |
if courses_done.is_set(): | |
return | |
continue | |
url = f'{root_domain}{main_url}?exam={course["id"]}' | |
for file in get_course_files(url, course): | |
files_queue.put(file) | |
pbar.update(1) | |
courses_queue.task_done() | |
def worker_download_all_files(): | |
while not files_done.is_set(): | |
try: | |
file = files_queue.get(block=True, timeout=2.0) | |
except Empty: | |
if files_done.is_set(): | |
return | |
continue | |
# TODO: custom file organization | |
original_filename = file['href'].split('/')[-1] | |
suffix = original_filename.split('.')[-1] | |
dest_path = root_dest_path / '{}, {}'.format(file['course']['name'], file['course']['type']) / '{} {}'.format(file['paper_id'], file['title']) / '{} {}.{}'.format(file['year'], file['term'], suffix) | |
download_file(file['href'], dest_path) | |
pbar.update(1) | |
files_queue.task_done() | |
##### Get authenticated session through manual login using Selenium ##### | |
sess = get_authenticated_session() | |
# TODO: check if indeed logged in | |
print('Login success.') | |
##### Getting the courses to crawl according to user keywords ##### | |
all_courses = [] | |
while True: | |
# Prompt search and output search results | |
search_text = input('\nSearch prompt: ').lower() | |
print('Finding courses...') | |
if not all_courses: | |
all_courses = get_all_courses() | |
matched_courses = match_courses(search_text, all_courses) | |
if not matched_courses: | |
print('No matched results. Try another prompt.') | |
continue | |
print('Results:') | |
for i, course in enumerate(matched_courses): | |
print(f'[{i + 1}] {course["type"]}, {course["name"]}') | |
# Select which results to crawl | |
reenter_flag = False | |
while True: | |
selected_course_indices = input('Please select courses IDs (separated by space), leave blank to re-enter search prompt: ').strip().split() | |
if not selected_course_indices: | |
reenter_flag = True | |
break | |
try: | |
selected_course_indices = list(map(int, selected_course_indices)) | |
for i in selected_course_indices: | |
assert 1 <= i <= len(matched_courses) | |
break | |
except Exception as e: | |
print('Invalid input. Try again.') | |
if reenter_flag: | |
continue | |
break | |
selected_courses = [] | |
for i in selected_course_indices: | |
selected_courses.append(matched_courses[i - 1]) | |
##### Getting all files ##### | |
courses_queue = Queue() | |
courses_done = Event() | |
files_queue = Queue() | |
files_done = Event() | |
print('\nFetching files for selected courses...') | |
parent_url = '/'.join(main_url.split('/')[:-1]) + '/' | |
for course in selected_courses: | |
courses_queue.put(course) | |
pbar = tqdm(total=len(selected_courses)) | |
workers = [] | |
for t in range(num_workers): | |
worker = Thread(target=worker_get_all_course_files) | |
workers.append(worker) | |
worker.daemon = True | |
worker.start() | |
courses_queue.join() | |
courses_done.set() | |
pbar.close() | |
print('Closing workers...') | |
for worker in workers: | |
worker.join() | |
total_num_files = files_queue.qsize() | |
print(f'Done. Total files: {total_num_files}') | |
##### Download all files ##### | |
print('\nDownloading all files...') | |
pbar = tqdm(total=total_num_files) | |
workers = [] | |
for t in range(num_workers): | |
worker = Thread(target=worker_download_all_files) | |
workers.append(worker) | |
worker.daemon = True | |
worker.start() | |
files_queue.join() | |
files_done.set() | |
pbar.close() | |
print('Closing workers...') | |
for worker in workers: | |
worker.join() | |
print('All done.') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment