Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save haylinmoore/5f7998c295201a94c6e94518dfcf3892 to your computer and use it in GitHub Desktop.
Save haylinmoore/5f7998c295201a94c6e94518dfcf3892 to your computer and use it in GitHub Desktop.
import logging
from datetime import datetime
from django.conf import settings
from django.utils import timezone
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support.select import Select
from spire.models import (
Course,
CourseOffering,
Section,
SectionCombinedAvailability,
SectionCoverage,
Subject,
Term,
)
from spire.scraper.academic_calendar import get_or_create_term
from spire.scraper.classes import RawCourse, RawInstructor, RawSection, RawSubject
from spire.scraper.classes.normalizers import REPLACE_DOUBLE_SPACE
from spire.scraper.shared import assert_match, scrape_spire_tables
from spire.scraper.spire_driver import SpireDriver
from spire.scraper.timer import Timer
from spire.scraper.versioned_cache import VersionedCache
log = logging.getLogger(__name__)
def _count_name_characters(s: str):
count = 0
for c in s:
if not c.isspace() and c != ",":
count += 1
return count
def _scrape_meeting_instructor_list(sections_table, link_number: str):
log.debug("Scraping meeting instructors...")
meeting_information_table = sections_table.find_element(By.ID, "SSR_CLSRCH_MTG1$scroll$" + link_number)
meeting_instructor_list = []
for meeting_row in meeting_information_table.find_elements(By.CSS_SELECTOR, "tr[id^=trSSR_CLSRCH_MTG]"):
instructor_column = meeting_row.find_element(By.CSS_SELECTOR, "div[id^=win0divUM_DERIVED_SR_UM_HTML1")
raw_instructor_text = instructor_column.text.strip()
while "\n\n" in raw_instructor_text or " " in raw_instructor_text:
raw_instructor_text = raw_instructor_text.replace("\n\n", "\n").replace(" ", " ")
log.debug("Scraping raw staff from %s", raw_instructor_text)
instructor_list = []
if len(raw_instructor_text) == 0 or raw_instructor_text in ("Staff", "TBD"):
instructor_list.append(RawInstructor(name="Staff"))
else:
found_names = set()
for email_link in instructor_column.find_elements(By.CSS_SELECTOR, "a[href^='mailto:']"):
href = email_link.get_property("href")
staff_name = email_link.text.strip()
if staff_name.endswith(","):
staff_name = staff_name[:-1]
staff_email = href[len("mailto:") :]
instructor_list.append(RawInstructor(name=staff_name, email=staff_email))
found_names.add(staff_name)
log.debug("Scraping raw text...")
possible_names = raw_instructor_text.split(", ")
skip = False
for i in range(len(possible_names)):
if skip:
skip = False
continue
if i < len(possible_names) - 1 and possible_names[i + 1] in ("Jr"):
name = f"{possible_names[i]}, {possible_names[i + 1]}"
skip = True
else:
name = possible_names[i]
if name not in found_names:
instructor_list.append(RawInstructor(name=name, email=None))
scraped_names = ", ".join(map(lambda x: x.name, instructor_list))
log.debug("Comparing name characters between '%s' and '%s'", scraped_names, raw_instructor_text)
assert (
abs(_count_name_characters(scraped_names) - _count_name_characters(raw_instructor_text)) == 0
)
meeting_instructor_list.append(instructor_list)
log.debug("Scraped meeting instructors: %s", [[str(r) for r in l] for l in meeting_instructor_list])
return meeting_instructor_list
def can_skip(driver: SpireDriver, offering: CourseOffering, spire_id: str, link_number: str):
try:
section = Section.objects.get(spire_id=spire_id, offering=offering)
if SectionCombinedAvailability.objects.filter(individual_availability_id=section.id).first():
return False, "section is combined"
status_icon = driver.find(
f"#win0divDERIVED_CLSRCH_SSR_STATUS_LONG\${link_number} > div > img", By.CSS_SELECTOR
)
src = status_icon.get_attribute("src") # type: ignore
if src.endswith("PS_CS_STATUS_WAITLIST_ICN_1.gif"):
current_status = "Wait List"
elif src.endswith("PS_CS_STATUS_OPEN_ICN_1.gif"):
current_status = "Open"
elif src.endswith("PS_CS_STATUS_CLOSED_ICN_1.gif"):
current_status = "Closed"
else:
log.debug(
"Uncaught image src: %s",
)
assert False
if section.details.status != current_status:
return False, "mismatched status"
ce_elem = driver.find("UM_DERIVED_SR_ENRL_TOT$" + link_number)
assert ce_elem
current_enrollment = int(ce_elem.text)
if section.availability.enrollment_total != current_enrollment:
return False, "mismatched enrollment total"
cc_elem = driver.find("UM_DERIVED_SR_ENRL_CAP$" + link_number)
assert cc_elem
current_capacity = int(cc_elem.text)
if section.availability.capacity != current_capacity:
return False, f"mismatched capacity, {section.availability.capacity} != {current_capacity}"
return True, "OK"
except Section.DoesNotExist:
return False, "does not exist"
# Extremely rare cases
COURSE_GROUP_OVERRIDES = {"FILM-ST 391SF": "FILM-ST 391SF S-International SciFi Cinema"}
SPIRE_ID_OVERRIDES = {
"-(62798)": "01-DIS(62798)",
"-(62799)": "01AA-DIS(62799)",
"-(62800)": "01AB-DIS(62800)",
"-(62801)": "01AC-DIS(62801)",
"`01-PRA(52104)": "01-PRA(52104)",
"`01-PRA(72927)": "01-PRA(72927)",
}
def _scrape_search_results(
driver: SpireDriver, cache: VersionedCache, quick: bool, term: Term, subject: Subject
):
scraped_section_count = 0
found_section_count = 0
scraped_course_ids = set()
for span_id in cache.skip_once(
driver.find_all_ids("span[id^=DERIVED_CLSRCH_DESCR200]"), "course_span_id"
):
cache.push("course_span_id", span_id)
span = driver.find(span_id)
assert span
title_match = assert_match(
r"(?P<subject_id>\S+)\s+(?P<course_number>\S+)\s+(?P<course_title>.+)",
COURSE_GROUP_OVERRIDES.get(span.text, span.text),
)
course_id, _, number = RawCourse.get_course_id(
title_match.group("subject_id"), title_match.group("course_number")
)
scraped_course_ids.add(course_id)
course_title = REPLACE_DOUBLE_SPACE(title_match.group("course_title").strip())
course, created = Course.objects.get_or_create(
id=course_id,
defaults={
"subject": subject,
"number": number,
"title": course_title,
"description": None,
"_updated_at": timezone.now(),
},
)
log.debug("Scraping sections for %s course: %s", "new" if created else "found", course)
offering, created = CourseOffering.objects.get_or_create(
course=course,
term=term,
defaults={
"subject": subject,
"alternative_title": course_title if course_title != course.title else None,
},
)
log.debug("%s course offering: %s", "Created" if created else "Got", offering)
scraped_spire_ids_for_course = set()
section_table_id = "ACE_DERIVED_CLSRCH_GROUPBOX1$133$" + span_id[len("DERIVED_CLSRCH_DESCR200") :]
sections_table = driver.find(section_table_id)
assert sections_table
link_ids: list[str] = [
e.get_property("id")
for e in sections_table.find_elements(
By.CSS_SELECTOR, "a[id^='DERIVED_CLSRCH_SSR_CLASSNAME_LONG\$']"
)
] # type: ignore
found_section_count += len(link_ids)
for link_id in link_ids:
link = driver.find(link_id)
assert link
t = link.text.strip()
spire_id = SPIRE_ID_OVERRIDES.get(t, t)
scraped_spire_ids_for_course.add(spire_id)
log.debug("Scraping section %s during %s...", spire_id, term.id)
link_number = link_id[len("DERIVED_CLSRCH_SSR_CLASSNAME_LONG$") :]
meeting_instructor_list = _scrape_meeting_instructor_list(
driver.find(section_table_id), link_number
)
log.debug("Scraped meeting instructor list: %s", meeting_instructor_list)
if quick:
can_skip_section, reason = can_skip(driver, offering, spire_id, link_number)
if can_skip_section:
log.info("Skipping section: %s", spire_id)
continue
else:
log.info("Not skipping section: %s - %s", spire_id, reason)
scraped_section_count += 1
log.info("Navigating to section page for %s section %s...", course_id, spire_id)
driver.click(link_id)
table_results = scrape_spire_tables(driver, "table.PSGROUPBOXWBO")
meeting_info_list = []
for row in driver.find_all("tr[id^='trSSR_CLSRCH_MTG$0_row']"):
meeting_info_list.append(
{
"days_and_times": row.find_element(By.CSS_SELECTOR, "span[id^=MTG_SCHED]").text,
"instructors": meeting_instructor_list.pop(),
"room": row.find_element(By.CSS_SELECTOR, "span[id^=MTG_LOC]").text,
"meeting_dates": row.find_element(By.CSS_SELECTOR, "span[id^=MTG_DATE]").text,
}
)
section = RawSection(
spire_id=spire_id,
details=table_results["Class Details"],
meeting_information=meeting_info_list,
restrictions=table_results.get("RESTRICTIONS & NOTES", None),
availability=table_results["Class Availability"],
description=table_results.get("Description", None),
overview=table_results.get("Class Overview", None),
)
log.info("Scraped section:\n%s", section)
log.info("Clicking return then pushing...")
driver.click("CLASS_SRCH_WRK2_SSR_PB_BACK", wait=False)
section.push(offering)
driver.wait_for_spire()
log.info("Returned to search results for %s durning %s.", subject, term)
dropped, _ = (
Section.objects.filter(offering__course=course, offering__term=term)
.exclude(spire_id__in=scraped_spire_ids_for_course)
.delete()
)
log.info("Dropped %s %s sections during %s that are no longer listed.", dropped, course_id, term)
dropped, _ = (
CourseOffering.objects.filter(subject=subject, term=term)
.exclude(course__id__in=scraped_course_ids)
.delete()
)
log.info("Dropped %s %s course offerings during %s that are no longer listed.", dropped, subject.id, term)
return scraped_section_count, found_section_count
def _initialize_query(driver: SpireDriver, term_id: str, subject_id: str):
for (select_id, value) in [
("UM_DERIVED_SA_UM_TERM_DESCR", term_id),
("CLASS_SRCH_WRK2_SUBJECT$108$", subject_id),
]:
element: WebElement = driver.wait_for_interaction(By.ID, select_id)
assert element
driver.scroll_to(element)
select = Select(element)
select.select_by_value(value)
driver.wait_for_spire()
number_select = Select(driver.find("CLASS_SRCH_WRK2_SSR_EXACT_MATCH1"))
number_select.select_by_visible_text("greater than or equal to")
driver.wait_for_spire()
number_input = driver.find("CLASS_SRCH_WRK2_CATALOG_NBR$8$")
assert number_input
number_input.clear()
number_input.send_keys("A")
driver.wait_for_spire()
open_input = driver.find("CLASS_SRCH_WRK2_SSR_OPEN_ONLY")
assert open_input
if open_input.is_selected():
open_input.click()
driver.wait_for_spire()
def scrape_sections(driver: SpireDriver, cache: VersionedCache, quick=False):
log.info("Scraping sections...")
total_timer = Timer()
scraped_terms = 0
total_scraped = 0
total_found = 0
def get_option_values(select: WebElement) -> list[tuple[str, str]]:
return [
(e.get_property("value"), e.text)
for e in select.find_elements(By.CSS_SELECTOR, "option")
if len(e.get_property("value")) > 0 # type: ignore
]
driver.switch()
# Fetch subject option values
subject_select: WebElement = driver.wait_for_interaction(By.ID, "CLASS_SRCH_WRK2_SUBJECT$108$")
assert subject_select
subject_values: list[tuple[str, str]] = get_option_values(subject_select)
# Fetch term option values
term_select: WebElement = driver.wait_for_interaction(By.ID, "UM_DERIVED_SA_UM_TERM_DESCR")
assert term_select
term_values: list[tuple[str, str]] = get_option_values(term_select)
# For each term, 5 academic years from the most recently posted year
for term_offset in range(cache.get("term_offset", 4 * 1 - 1), -1, -1): # type: ignore
cache.push("term_offset", term_offset)
(term_id, flipped_term) = term_values[term_offset]
[year, season] = flipped_term.split(" ")
term = get_or_create_term(season, year)
# Establish coverage entry
coverage, _ = SectionCoverage.objects.get_or_create(
term=term,
defaults={"completed": False, "start_time": timezone.now()},
)
if coverage.completed and settings.SCRAPER_SKIP_OLD_TERMS:
year = int(year)
match season:
case "Fall":
end_date = datetime(year=year + 1, month=1, day=1)
case "Winter":
end_date = datetime(year=year + 1, month=2, day=15)
case "Spring":
end_date = datetime(year=year, month=6, day=1)
case "Summer":
end_date = datetime(year=year, month=9, day=15)
case _:
assert False
if coverage.end_time and timezone.make_aware(end_date) < coverage.end_time:
log.info("Skipping the %s term, as information is static.", term)
continue
log.info("Scraping sections during %s...", term)
term_timer = Timer()
# For each subject
for (subject_id, subject_title) in cache.skip_once(subject_values, "subject"):
cache.push("subject", (subject_id, subject_title))
subject_timer = Timer()
# Initialize and search for subject during term
_initialize_query(driver, term_id, subject_id)
raw_subject = RawSubject(subject_id, subject_title)
subject = raw_subject.push()
log.info("Searching for sections in subject %s during %s...", subject.id, term)
driver.click("CLASS_SRCH_WRK2_SSR_PB_CLASS_SRCH")
assert driver.find("win0divDERIVED_CLSRCH_SSR_CLASS_LBLlbl")
return_button = driver.find("CLASS_SRCH_WRK2_SSR_PB_NEW_SEARCH")
if return_button:
scraped, found = _scrape_search_results(driver, cache, quick, term, subject)
log.info(
"Scraped %s %s sections during %s in %s. Returning...",
scraped,
subject,
term,
subject_timer,
)
if quick:
total_scraped += scraped
total_found += found
log.info("Running skip percentage of: %.2f%%", (1 - (total_scraped / total_found)) * 100)
driver.click("CLASS_SRCH_WRK2_SSR_PB_NEW_SEARCH")
else:
error_message_span = driver.find("DERIVED_CLSMSG_ERROR_TEXT")
assert error_message_span
warning = error_message_span.text
if warning == "The search returns no results that match the criteria specified.":
log.info("There are no %s section during %s. Skipping.", subject, term)
else:
log.warning("Failed while searching: %s", error_message_span.text)
raise Exception(f"Unexpected search error: {warning}")
coverage.completed = True
coverage.end_time = timezone.now()
coverage.save()
log.info("Scraped sections for the %s term in %s.", term, term_timer)
log.info("Scraped %s terms in %s.", scraped_terms, total_timer)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment