Last active
November 23, 2024 18:29
-
-
Save IamMusavaRibica/7498bb043a8f5a7af618fa2b7c9b96ba to your computer and use it in GitHub Desktop.
eDnotifier source
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import aiohttp | |
import asyncio | |
from collections import namedtuple | |
from typing import Optional, Any, Union, Callable | |
import lxml.etree | |
from lxml.etree import HTML | |
from datetime import date | |
from dataclasses import dataclass | |
try: import regex as re # regex is usually faster than standard re | |
except: import re | |
__title__ = 'ednevnik' | |
__author__ = 'https://github.com/IamMusavaRibica' | |
__license__ = 'MIT' | |
HOST = 'https://ocjene.skole.hr' | |
LOGIN = f'{HOST}/login' | |
CLASS = f'{HOST}/class' | |
EXAM = f'{HOST}/exam' # loads all exams | |
GRADE_ALL = f'{HOST}/grade/all' | |
DEFAULT_HEADERS = { | |
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.63 Safari/537.36' | |
} | |
PATTERN1 = re.compile(r'\s\s+') # for removing multiple spaces, tabs and newlines | |
def _fix_text(string): return PATTERN1.sub(' ', string) | |
# namedtuples, if typehints aren't required | |
Grade = namedtuple('Grade', 'note date category grade') | |
Exam = namedtuple('Exam', 'subject note date') | |
Subject = namedtuple('Subject', 'name teacher grades exams href') | |
School = namedtuple('School', 'name city class_teacher class_ school_year') | |
@dataclass | |
class Grade: | |
note: str # biljeska | |
date: date # samo DD, MM, YYYY | |
category: Optional[str] # rubrika vrednovanja | |
grade: Optional[int] # ocjena, None ako je biljeska | |
@dataclass | |
class Exam: | |
subject: str | |
note: str | |
date: date | |
@dataclass | |
class Subject: | |
name: str | |
teacher: str | |
grades: list[Grade] | |
exams: Optional[list[Exam]] | |
href: str | |
@dataclass | |
class School: | |
name: str | |
city: str | |
class_teacher: str | |
class_: str # always uppercase | |
school_year: str | |
class Utils: | |
@staticmethod | |
def _extract_school_info(parser: lxml.etree.HTMLParser): | |
# you don't need to understand this | |
main_div = parser.xpath('//div[@class="school"]')[0] | |
class_div, school_name_div, _ = main_div.getchildren() | |
a1, a2 = class_div.getchildren() | |
class_ = a1.text.upper() | |
school_year = a2.text | |
a1, a2 = school_name_div.getchildren() | |
b1, b2 = a1.getchildren() | |
school_name = b1.text | |
school_city = b2.text.strip('; ') | |
_, b1 = a2.getchildren() | |
class_teacher = b1.text | |
return school_name, school_city, class_teacher, class_, school_year | |
@staticmethod | |
def _extract_grade_info(grade: lxml.etree.Element, s1, s2): | |
a1, a2, a3, a4 = grade.getchildren() | |
raw_date = a1[0].text | |
# python 3.8+ only | |
note = c2[0].text if (c2 := a2.getchildren()) else None | |
category = c3[0].text if (c3 := a3.getchildren()) else None | |
mark = c4[0].text if (c4 := a4.getchildren()) else None | |
date_ = Utils._convert_date(raw_date, s1, s2) | |
return note, date_, category, mark | |
@staticmethod | |
def _extract_exam_info(exam: lxml.etree.Element, s1, s2): | |
# if ValueError or TypeError occurs here, you might have done too much requests | |
assert exam is not None, 'Too much requests, please slow down' | |
subject, note, raw_date = map(lambda t: t[0].text, exam.getchildren()) | |
note = _fix_text(note) | |
return subject, note, Utils._convert_date(raw_date, s1, s2) | |
@staticmethod | |
def _convert_date(raw_date: str, first_half: int, second_half: int): | |
D, M = map(int, raw_date.strip('.').split('.')) # day and month (last character is a dot) | |
return date(day=D, month=M, year=2000 + (first_half if 9 <= M <= 12 else second_half)) | |
class EDNClient(Utils): | |
_session: aiohttp.ClientSession # set within factory function, passed to __init__ | |
subjects: dict[str, Subject] | |
tasks: dict[str, asyncio.Task] | |
school: Optional[School] | |
def __init__(self, session, load_grades, load_exams): | |
self._session = session | |
self.subjects = {} | |
self.tasks = {} | |
self.school = None # set during load_course | |
self.tasks['course'] = self._schedule_event(self._load_course, "load_course") | |
if load_grades: self.tasks['grades'] = self._schedule_event(self._load_grades, "load_grades") | |
if load_exams: self.tasks['exams'] = self._schedule_event(self._load_exams, "load_exams") | |
@classmethod # factory function | |
async def login(cls, email: str, password: str, headers=None, load_grades=False, load_exams=False): | |
def _get_csrf_token(html): | |
return HTML(html).xpath("//input[@name='csrf_token']")[0].attrib.get('value') | |
session = aiohttp.ClientSession(headers=headers or DEFAULT_HEADERS) | |
login_page = await (await session.get(LOGIN)).text() | |
csrf = _get_csrf_token(login_page) | |
login = await session.post( | |
LOGIN, data={ | |
'username': email, | |
'password': password, | |
'csrf_token': csrf | |
}, allow_redirects=False) | |
session.cookie_jar.update_cookies({'cnOcjene': login.cookies['cnOcjene'].value}) | |
return cls(session, load_grades, load_exams) | |
async def _load_course(self, initial_html=None): | |
if initial_html is None: | |
course = await self._session.get(CLASS) # idk why /course doesn't work here | |
initial_html = await course.text() | |
parser = HTML(initial_html) | |
# moved to a separate method | |
self.school = School(*self._extract_school_info(parser)) | |
# fetch subjects | |
subjects = parser.xpath('//div[@class="content"]/ul')[0] | |
for s in subjects.iterchildren(): | |
a = s[0] | |
name, teacher, _ = a.getchildren() | |
name = name.text | |
teacher = _fix_text(teacher.text.strip()) | |
href = a.attrib['href'] | |
self.subjects[str(name)] = Subject(name, teacher, [], [], href) | |
async def _load_grades(self): | |
course = await self._session.get(GRADE_ALL) | |
parser = HTML(await course.text()) | |
table = parser.xpath('//div[@class="content"]')[0] | |
# we loaded the grades, now need to wait to get subjects set | |
await self.tasks['course'] | |
# split the school year into two semesters | |
s1, s2 = map(int, self.school.school_year.split('/')) | |
for subject in table.iterchildren(): | |
grades = subject.getchildren() | |
# hope for compatibility (two places where subject name appears) | |
try: subject_name = grades[0].attrib['data-action-id'] | |
except: subject_name = grades[0][0].text | |
for grade in grades[-1:1:-1]: | |
note, date_, category, mark = self._extract_grade_info(grade, s1, s2) | |
self.subjects[subject_name].grades.append(Grade(note, date_, category, int(mark) if mark else None)) | |
async def _load_exams(self): | |
exams = await self._session.get(EXAM, allow_redirects=False) | |
# rarely, this might be a redirect to /class, with an error stating | |
# a class was not selected and exams couldn't be displayed | |
for _ in range(5): | |
if exams.status != 302: break | |
await asyncio.sleep(1) | |
exams = await self._session.get(EXAM, allow_redirects=False) | |
else: | |
raise RuntimeError('couldn\'t fetch /exam after 5 retries (got a redirect)') | |
parser = HTML(await exams.text()) | |
table = parser.xpath('//div[@class="content"]')[0] | |
await self.tasks['course'] | |
s1, s2 = map(int, self.school.school_year.split('/')) | |
# grouped by months | |
for parent in table.iterchildren(): | |
children = parent.getchildren() | |
for exam_ in children[2:]: | |
try: | |
subject, note, date_ = self._extract_exam_info(exam_, s1, s2) | |
except Exception as e: | |
# This is pretty rare. | |
with open('last_exam.html', 'w', encoding='utf-8') as f: f.write(await exams.text()) | |
raise RuntimeError('eDnevnik unknown error. Webpage instance saved at /last_error.html') from e | |
if subject not in self.subjects: # this should not happen | |
raise RuntimeError(f"Loaded an exam for invalid subject '{subject}': [{date_}] '{note}'") | |
self.subjects[subject].exams.append(Exam(subject, note, date_)) | |
def _schedule_event(self, coro, event_name=None, *args, **kwargs): | |
wrapped = self._run_event(coro, *args, **kwargs) | |
return asyncio.create_task(wrapped, name=event_name or "None") | |
async def _run_event(self, coro, event_name=None, *args, **kwargs): | |
try: | |
await coro(*args, **kwargs) | |
except asyncio.CancelledError: pass | |
except Exception as e: | |
# print(f"Exception in event '{event_name}': {e}") | |
raise | |
async def close(self): | |
await self._session.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import discord | |
from discord.ext import tasks | |
from datetime import date, time, datetime | |
from ednevnik import EDNClient | |
USERNAME = '[email protected]' | |
PASSWORD = '1234567890' | |
CHANNEL_ID = 901462505237413898 | |
ROLE_ID = 933447083527135362 # officially the role the bot will mention | |
BOT_TOKEN = '' | |
def check_new_exam(subject: str, note: str, date_: date) -> bool: | |
# Custom function that should check whether an exam is new | |
# and return the appropriate boolean value. This function | |
# depends on what type of database is used to store information | |
# about exams that have been registered before. | |
... | |
bot = discord.Bot() | |
target_channel: discord.TextChannel # set after on_ready | |
@tasks.loop(minutes=3) # Can be adjusted | |
async def checker(): | |
client = await EDNClient.login(USERNAME, PASSWORD, load_exams=True) | |
await client.tasks['exams'] | |
for subject in client.subjects.values(): | |
for exam in subject.exams: | |
if check_new_exam(subject.name, exam.note, exam.date): | |
e = discord.Embed( | |
title="Novi ispit!", | |
description=f"```less\n[{exam.date}] {exam.subject} ({exam.note})```", | |
timestamp=datetime.now(), | |
) | |
e.set_footer(text='\u200b') | |
timestamp = datetime.combine(exam.date, time(hour=3)).timestamp() | |
await target_channel.send( | |
f"<@&{ROLE_ID}> <t:{round(timestamp)}:R>", | |
embed=e | |
) | |
# also want to store the newly added exam into the database | |
await client.close() | |
@checker.before_loop | |
async def checker_setup(): | |
global target_channel | |
target_channel = bot.get_channel(CHANNEL_ID) or await bot.fetch_channel(CHANNEL_ID) | |
@bot.event | |
async def on_ready(): | |
checker.start() | |
assert (USERNAME, PASSWORD) != ('[email protected]', '1234567890'), 'Did you forget to set your credentials?' | |
bot.run(BOT_TOKEN) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
lxml | |
aiohttp |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment