Last active
June 12, 2022 05:45
-
-
Save opabravo/05de673cc519c469e26d10fcbfba8f46 to your computer and use it in GitHub Desktop.
A cog of my Multifunctional Discord BOT: "Elearning System on Discord"
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A cog of my Multifunctional Discord BOT: "Elearning System on Discord", | |
I think it's a good practice to prevent SQLIs in python codes, | |
by wrapping the queries with functions which only accept static type args, | |
also for explicit permission managing and safe structure...hope so :)) | |
""" | |
import asyncio | |
import datetime | |
import io | |
import os | |
import random | |
import re | |
import smtplib | |
from email.header import Header | |
from email.mime.text import MIMEText | |
from email.utils import formataddr | |
from os.path import join, dirname | |
import asyncpg | |
import discord | |
from discord.ext import commands | |
from dotenv import load_dotenv | |
from cogs.utils import checks, words, cache | |
dotenv_path = join(dirname(__file__), '.env') | |
load_dotenv(dotenv_path) | |
TYPING = '<a:typing:610062947141812224>' | |
BULLET = '\N{BULLET} ' | |
GUILDS = [375966733896515584, 888406032127242280, 977764812480217148] | |
EMAIL_REGEX = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' | |
def color_rnd(): | |
return random.randint(0, 0xffffff) | |
async def get_or_create_cat(guild: discord.Guild, name: str) -> discord.CategoryChannel: | |
return discord.utils.find(lambda c: c.name == name, guild.categories) or await guild.create_category(name=name, position=0) | |
async def get_or_create_role(guild: discord.Guild, name: str, color=0x0000ff) -> discord.Role: | |
return discord.utils.find(lambda r: r.name == name, guild.roles) or await guild.create_role(name=name, colour=color) | |
async def cat_create_text_channels(category: discord.CategoryChannel, *channel_names): | |
for channel in channel_names: | |
if not discord.utils.find(lambda c: c.name == channel, category.text_channels): | |
await category.create_text_channel(channel, topic="") | |
class Elearn(commands.Cog): | |
def __init__(self, bot): | |
self.bot = bot | |
self.email_server = None | |
self.GUILDS = [] | |
self.bot.loop.create_task(self.db_create_pool()) | |
async def db_create_pool(self): | |
if not hasattr(self.bot, 'school_pool'): | |
self.bot.school_pool = await asyncpg.create_pool( | |
host=os.environ.get("PG_HOST"), | |
port=os.environ.get("PG_PORT"), | |
user=os.environ.get("PG_USER"), | |
password=os.environ.get("PG_PASSWORD"), | |
database=os.environ.get("PG_DATABASE"), | |
) | |
# async def cog_check(self, ctx): | |
# if ctx.message.guild: | |
# data = await self.fetch_courses_row(ctx.message.guild.id) | |
# return data | |
# # return ctx.message.guild.id in GUILDS | |
# def cog_unload(self): | |
# self.bot.school_pool.close() | |
def send_email(self, recipient: str, code: int): | |
message = MIMEText(f'<h1 style="color:red;font-size:100px;">{code}<h1>', 'html', 'utf-8') | |
message['Subject'] = Header('驗證碼 :', "utf-8") | |
message['From'] = formataddr((str(Header('遠距教學Discord', 'utf-8')), '[email protected]')) | |
message['To'] = recipient | |
try: | |
with smtplib.SMTP_SSL("smtp.zoho.com", 465) as server: | |
server.login(os.environ.get("MAIL_USER"), os.environ.get("MAIL_PASSWD")) | |
server.sendmail('[email protected]', recipient, message.as_string()) | |
except Exception as e: | |
raise | |
def send_verify_code(self, email: str): | |
code = random.randint(10000, 99999) | |
# Debug | |
self.send_email(email, code) | |
return code | |
async def handle_reactions(self, msg: discord.Message, user: discord.Member, timeout: int, *emojis) -> str: | |
async def add_reactions_task(): | |
for emoji in emojis: | |
await msg.add_reaction(emoji) | |
self.bot.loop.create_task(add_reactions_task()) | |
def check(reaction, member): | |
if str(reaction.emoji) not in emojis: | |
return | |
if reaction.message.id == msg.id and member == user: | |
return True | |
try: | |
result = await self.bot.wait_for('reaction_add', timeout=timeout, check=check) | |
except asyncio.TimeoutError: | |
return '' | |
finally: | |
try: | |
await msg.clear_reactions() | |
except discord.errors.NotFound: | |
return '' | |
return str(result[0].emoji) | |
async def handle_response(self, channel: discord.TextChannel, author: discord.Member, timeout: int = 300) -> str: | |
try: | |
response = await self.bot.wait_for( | |
'message', check=lambda m: m.author == author and m.channel == channel, timeout=timeout | |
) | |
except asyncio.TimeoutError: | |
return '' | |
return response.content | |
# @staticmethod | |
# async def give_perms(member: discord.Member): | |
# await member.remove_roles(*[discord.Object(id=x) for x in ROLES_TO_REMOVE]) | |
# await member.add_roles(*[discord.Object(id=x) for x in ROLES_TO_GIVE]) | |
@staticmethod | |
async def _init_teacher_role(guild: discord.Guild): | |
teacher_role = await get_or_create_role(guild, '老師', 0x6a0dad) | |
await guild.owner.add_roles(teacher_role) | |
async def _init_channels_and_perms(self, guild: discord.Guild): | |
if not discord.utils.find(lambda c: c.name == "學生驗證", guild.text_channels): | |
await guild.create_text_channel(name="學生驗證", position=0) | |
info_cat = await get_or_create_cat(guild, "▬▬▬ 全域資訊 ▬▬▬") | |
await cat_create_text_channels(info_cat, "公告", "教學規範", "課程說明") | |
class_cat = await get_or_create_cat(guild, "▬▬▬ 課程區 ▬▬▬") | |
await cat_create_text_channels(class_cat, "上課頻道", "討論區") | |
etc_cat = await get_or_create_cat(guild, "▬▬▬ 其他區 ▬▬▬") | |
await cat_create_text_channels(etc_cat, "指令頻道") | |
if not discord.utils.find(lambda c: c.name == "上課語音", class_cat.voice_channels): | |
await class_cat.create_voice_channel(name="上課語音") | |
await self._init_perms(class_cat, etc_cat) | |
@staticmethod | |
async def _init_perms(*channels): | |
guild = channels[0].guild | |
student_role = await get_or_create_role(guild, '學生') | |
unverified_role = await get_or_create_role(guild, '未驗證', 0xFF6600) | |
verify_chan = discord.utils.find(lambda c: c.name == "學生驗證", guild.text_channels) | |
await verify_chan.set_permissions(student_role, read_messages=False) | |
await verify_chan.set_permissions(unverified_role, read_messages=True) | |
await verify_chan.set_permissions(guild.default_role, read_messages=True) | |
info_cat = await get_or_create_cat(guild, "▬▬▬ 全域資訊 ▬▬▬") | |
await info_cat.set_permissions(guild.default_role, send_messages=False) | |
for c in channels: | |
await c.set_permissions(guild.default_role, read_messages=False) | |
await c.set_permissions(student_role, read_messages=True) | |
async def _prompt_init(self, ctx): | |
# await ctx.message.add_reaction('\u2705') | |
# await ctx.send("已完成初始頻道設置\n請到 #教學規範、#課程說明 張貼您的課程資訊") | |
msg = await ctx.reply("接下來將請您輸入這門課程的相關資訊") | |
emoji = await self.handle_reactions(msg, ctx.author, 60, '\N{WHITE HEAVY CHECK MARK}') | |
if not emoji: | |
await msg.edit(f'{msg.content}\n:x: (已逾時)') | |
return | |
sql_data = {'server_id': ctx.guild.id, 'teacher_id': ctx.author.id, 'school_domain': None, | |
'start_time': None, 'end_time': None} | |
questions = {'school_domain': "學生的eamil域名(Ex: `cc.takming.edu.tw`) :", | |
'course_weekday': "課程開始星期?(格式範例: `1`、`3`、`7`) :", | |
'start_time': "課程開始?(格式範例: `9:20`、`13:20`) :", | |
'end_time': "課程結束?(格式範例: `12:10`、`16:10`) :"} | |
hh_mm_converter = lambda h, m: datetime.datetime.now().replace(hour=int(h), minute=int(m), second=0) | |
week_day = 0 | |
for q_id, q_str in questions.items(): | |
msg = await ctx.reply(q_str) | |
resp = await self.handle_response(ctx.channel, ctx.author) | |
if resp == '': | |
await msg.edit(f'{msg.content} (已逾時)') | |
return await msg.add_reaction('\u274C') | |
if q_id == 'course_weekday': | |
try: | |
week_day = int(resp) | |
continue | |
except ValueError: | |
return await msg.reply('格式錯誤!') | |
except Exception: | |
return await msg.reply('未知錯誤!') | |
if 'time' in q_id: | |
data = resp.split(":") | |
sql_data[q_id] = hh_mm_converter(data[0], data[1]) | |
continue | |
sql_data[q_id] = resp | |
# Dummy algorithm to generate datetime by weekday, bad practice i know, pls dont shit on me | |
# so that i dont have to refactor code and insert additional columns into db | |
def gen_date_by_weekday(date: datetime.datetime, weekday: int): | |
for i in range(1, 20): | |
modified_date = date.replace(day=i) | |
if (modified_date.weekday() + 1) == weekday: | |
return modified_date | |
for column in ('start_time', 'end_time'): | |
sql_data[column] = gen_date_by_weekday(sql_data.get(column), week_day) | |
return sql_data | |
@checks.is_admin() | |
@commands.group() | |
async def elearn(self, ctx): | |
"""Discord遠距教學系統,使用g!help [指令]來獲取幫助""" | |
if ctx.invoked_subcommand is None: | |
await ctx.send_help('elearn') | |
async def fetch_messangers(self, channel: discord.TextChannel, msg_start: discord.Message, | |
msg_end: discord.Message): | |
records = [] | |
async for msg in channel.history(limit=2000, before=msg_end, after=msg_start): | |
# Skip admins | |
if msg.author.guild_permissions.manage_guild: | |
continue | |
try: | |
records.append(msg.author.id) | |
except Exception: | |
pass | |
return records | |
@elearn.command() | |
async def stg(self, ctx, minutes: int = None): | |
"""手動點名功能""" | |
if not minutes: | |
msg = await ctx.reply("請輸入開放點名的時間(分鐘),介於 `1 ~ 10`") | |
resp = await self.handle_response(ctx.channel, ctx.author) | |
if resp == '': | |
return await msg.edit(f"{msg}\n已逾時..") | |
try: | |
minutes = int(resp) | |
assert 1 <= minutes <= 10 | |
except ValueError: | |
return await ctx.reply("請輸入數字") | |
except AssertionError: | |
return await ctx.reply("請輸入介於**`1 ~ 10`的數字**") | |
except Exception: | |
raise | |
now_min = datetime.datetime.now() | |
final_min = now_min + datetime.timedelta(minutes=minutes) | |
title = f"已開始計時點名 : `{now_min.hour}:{now_min.minute}` ~ `{final_min.hour}:{final_min.minute}`" | |
embed = discord.Embed(colour=0x00ff00, title=title) | |
msg_start = await ctx.send(embed=embed) | |
await self.handle_response(ctx.channel, ctx.author, minutes * 60) | |
embed.title, embed.colour = title.replace('已開始', '已結束'), 0xff0000 | |
msg_end = await ctx.send(embed=embed) | |
records = await self.fetch_messangers(ctx.channel, msg_start, msg_end) | |
student_ids = [] | |
for user_id in records: | |
if r := await self.fetch_students_row(ctx.guild.id, user_id): | |
student_ids.append(r.get('email').split('@')[0]) | |
replied_students = '\n'.join(student_ids) | |
embed.description = f'**已點名學生**:\n```fix\n{replied_students}```' | |
await msg_end.edit(embed=embed) | |
# await self.record_attendance() | |
async def sql_table_print_att(self, ctx, date: datetime.datetime): | |
from cogs.utils.formats import TabularData | |
results = await self.fetch_attendance_rows(ctx.guild.id, date) | |
headers = list(results[0].keys()) | |
table = TabularData() | |
table.set_columns(headers) | |
table.add_rows(list(r.values()) for r in results) | |
render = table.render() | |
fmt = f'```\n{render}\n```' | |
if len(fmt) > 2000: | |
fp = io.BytesIO(fmt.encode('utf-8')) | |
await ctx.send('Too many results...', file=discord.File(fp, 'results.txt')) | |
else: | |
await ctx.send(fmt) | |
@elearn.command() | |
async def att(self, ctx, time: str = None): | |
"""根據日期顯示自動點名資訊,直接列sql table | |
Example: g!elearn stg 2022-5-12""" | |
if not time: | |
msg = await ctx.reply("請輸入上課的絕對日期,Ex: `2022-05-12`") | |
resp = await self.handle_response(ctx.channel, ctx.author) | |
if resp == '': | |
return await msg.edit(f"{msg}\n已逾時..") | |
time = resp | |
try: | |
date = datetime.datetime.strptime(time, "%Y-%m-%d") | |
except ValueError: | |
return await ctx.reply('日期格式錯誤') | |
embed = discord.Embed(colour=0x00ff00, title='# TODO: DATA Visualization') | |
try: | |
await self.sql_table_print_att(ctx, date) | |
except IndexError: | |
embed.title = f'無此紀錄! - `{date}`' | |
except Exception: | |
raise | |
await ctx.reply(embed=embed) | |
@elearn.command() | |
async def init(self, ctx: commands.Context): | |
"""申請並設置遠距教學服務""" | |
await self._init_teacher_role(ctx.guild) | |
await self._init_channels_and_perms(ctx.guild) | |
try: | |
sql_data = await self._prompt_init(ctx) | |
except IndexError: | |
return await ctx.reply("格式錯誤!") | |
if not sql_data: | |
return | |
# today = datetime.datetime.now() | |
# sql_data = {'server_id': ctx.guild.id, 'teacher_id': ctx.author.id, 'school_domain': '', | |
# 'start_time': today.replace(hour=9, minute=59, second=0), | |
# 'end_time': today.replace(hour=12, minute=59, second=0)} | |
await self.record_courses(sql_data) | |
await ctx.reply("已成功設定!") | |
@elearn.command() | |
async def uninit(self, ctx: commands.Context): | |
"""註銷遠距教學服務""" | |
guild = ctx.guild | |
categories = ["▬▬▬ 全域資訊 ▬▬▬", "▬▬▬ 課程區 ▬▬▬", "▬▬▬ 其他區 ▬▬▬"] | |
categories = [discord.utils.get(guild.categories, name=x) for x in categories] | |
if v_c := discord.utils.find(lambda c: c.name == "學生驗證", guild.text_channels): | |
await v_c.delete() | |
for cat in categories: | |
if cat.voice_channels: | |
await discord.utils.find(lambda c: c.name == "上課語音", cat.voice_channels).delete() | |
for c in cat.text_channels: | |
await c.delete() | |
for cat in categories: | |
await cat.delete() | |
await ctx.message.add_reaction('\u2705') | |
async def record_attendance(self, user: discord.Member, time: datetime.datetime, lesson: int, is_in_vc: bool): | |
async with self.bot.school_pool.acquire() as conn: | |
sql = "INSERT INTO attendance ( student_id, server_id, lesson, time, is_in_vc ) VALUES ($1, $2, $3, $4, $5)" \ | |
"RETURNING id" | |
att_id = await conn.execute(sql, user.id, user.guild.id, lesson, time, is_in_vc) | |
# self.fetch_attendance_rows.invalidate(self, att_id) | |
async def record_students(self, channel: discord.TextChannel, user: discord.Member, email: str): | |
# sourcery skip: assign-if-exp, swap-if-expression | |
info = await self.fetch_students_row(channel.guild.id, user.id) | |
async with self.bot.school_pool.acquire() as conn: | |
if not info: | |
sql = "INSERT INTO students ( student_id, server_id, email ) VALUES ($1, $2, $3) " | |
else: | |
sql = "UPDATE students SET email=$3 WHERE student_id=$1 and server_id=$2" | |
await conn.execute(sql, user.id, channel.guild.id, email) | |
# self.fetch_students_row.invalidate(self, server_id=channel.guild.id) | |
async def record_courses(self, sql_data: dict): | |
async with self.bot.school_pool.acquire() as conn: | |
sql = "INSERT INTO courses ( server_id, teacher_id, school_domain, start_time, end_time ) " \ | |
"VALUES ($1, $2, $3, $4, $5) " \ | |
"ON CONFLICT(server_id) " \ | |
"DO UPDATE SET teacher_id=$2, school_domain=$3, start_time=$4, end_time=$5 " \ | |
"RETURNING course_id" | |
# print(sql) | |
# print(sql_data.values()) | |
course_id = await conn.fetchval(sql, *sql_data.values()) | |
print(f"{course_id=}") | |
# self.fetch_courses_row.invalidate(self, server_id=sql_data.get('server_id')) | |
# @cache.cache() | |
async def fetch_courses_row(self, server_id: int): | |
query = """SELECT * FROM courses WHERE server_id=$1;""" | |
async with self.bot.school_pool.acquire(timeout=300.0) as con: | |
return await con.fetchrow(query, server_id) | |
# @cache.cache() | |
async def fetch_students_row(self, server_id: int, user_id: int = None): | |
if user_id: | |
query = """SELECT * FROM students WHERE server_id=$1 and student_id=$2""" | |
else: | |
query = """SELECT * FROM students WHERE server_id=$1""" | |
async with self.bot.school_pool.acquire(timeout=300.0) as con: | |
return await con.fetchrow(query, server_id, user_id) | |
# @cache.cache() | |
async def fetch_attendance_rows(self, server_id: int, date: datetime.datetime): | |
query = """SELECT student_id, lesson, time, is_in_vc FROM attendance | |
WHERE server_id=$1 and date(time) = $2""" | |
async with self.bot.school_pool.acquire(timeout=300.0) as con: | |
return await con.fetch(query, server_id, date) | |
# @cache.cache() | |
# async def fetch_attendance_rows(self, server_id: int): | |
# query = """SELECT * FROM attendance WHERE server_id=$1;""" | |
# async with self.bot.school_pool.acquire(timeout=300.0) as con: | |
# return await con.fetchrow(query, server_id) | |
@staticmethod | |
async def _give_perms(user: discord.Member): | |
student_role = await get_or_create_role(user.guild, '學生') | |
unverified_role = await get_or_create_role(user.guild, '未驗證', 0xFF6600) | |
await user.remove_roles(unverified_role) | |
await user.add_roles(student_role) | |
async def verify(self, channel: discord.TextChannel, user: discord.Member): | |
# sourcery skip: assign-if-exp, swap-if-expression | |
info = await self.fetch_courses_row(channel.guild.id) | |
valid_email_domain = info['school_domain'] | |
await channel.send(f"{user.mention} 請輸入**學校email**以供驗證 (`XXXXXXXXX@{valid_email_domain}`) {TYPING}") | |
resp = await self.handle_response(channel, user) | |
email = resp.lower() | |
if not resp: | |
return await channel.send(f"{user.mention} 逾時") | |
if not re.fullmatch(EMAIL_REGEX, email): | |
return await channel.send(f"{user.mention} 格式不符!") | |
if not resp.endswith(f"@{valid_email_domain}"): | |
return await channel.send(f"{user.mention} email域名不符!") | |
await channel.send(f"{user.mention}\n已寄驗證碼到此信箱,請檢察\n輸入驗證碼 {TYPING}") | |
code = self.send_verify_code(email) | |
# debug | |
print(code) | |
resp = await self.handle_response(channel, user) | |
if resp == '': | |
return await channel.send(f"{user.mention} 驗證碼輸入逾時") | |
if str(code) not in resp: | |
return await channel.send(f"{user.mention} 驗證碼錯誤") | |
await self.record_students(channel, user, email) | |
await channel.send(f"`{user.id}` - {user.mention}\n已完成驗證") | |
try: | |
await self._give_perms(user) | |
finally: | |
embed = words.quote_embed(user) | |
await user.send(embed=embed) | |
# data = {"user_id": user.id, "server_id": channel.guild.id, "email": email} | |
# await channel.send( | |
# f"{user.mention},您已完成驗證\n將記錄到資料表 - `student`\n```json\n{json.dumps(data, indent=4, ensure_ascii=False)}```" | |
# ) | |
@commands.Cog.listener() | |
async def on_message(self, message: discord.Message): | |
if not message.guild: | |
return | |
# Check if teacher have registered elearn | |
data = await self.fetch_courses_row(message.guild.id) | |
if not data: | |
return | |
channel = message.channel | |
if channel.name != "學生驗證": | |
return | |
if message.content.lower() in ('verify', '驗證'): | |
await self.verify(channel, message.author) | |
@commands.Cog.listener() | |
async def on_member_join(self, member: discord.Member): | |
if not member.guild: | |
return | |
# Check if teacher have registered elearn | |
data = await self.fetch_courses_row(server_id=member.guild.id) | |
print(f"{data=}") | |
if not data: | |
return | |
record = await self.fetch_students_row(member.guild.id, member.id) | |
unverified_role = await get_or_create_role(member.guild, '未驗證', 0xFF6600) | |
student_role = await get_or_create_role(member.guild, '學生') | |
if not record: | |
await member.add_roles(unverified_role) | |
if verify_chan := discord.utils.find(lambda c: c.name == "學生驗證", member.guild.text_channels): | |
await self.verify(verify_chan, member) | |
return | |
await member.add_roles(student_role) | |
async def filter_voice_event(self, member, before, after): | |
if before.channel == after.channel: | |
return | |
# Check if teacher have registered elearn | |
data = await self.fetch_courses_row(member.guild.id) | |
if not data: | |
return | |
before_channel = getattr(before, 'channel', []) | |
after_channel = getattr(after, 'channel', []) | |
# if not after.channel: | |
# return | |
if after_channel and after_channel.category.name != '▬▬▬ 課程區 ▬▬▬' and after_channel.name != '上課語音': | |
return | |
if before_channel and before_channel.category.name != '▬▬▬ 課程區 ▬▬▬' and before_channel.name != '上課語音': | |
return | |
# if member not in after.channel.members: | |
# return | |
course_record = await self.fetch_courses_row(member.guild.id) | |
if not course_record: | |
return | |
start_time: datetime.datetime = course_record['start_time'] | |
end_time: datetime.datetime = course_record['end_time'] | |
now: datetime.datetime = datetime.datetime.now() | |
now_weekday, att_weekday = now.weekday() + 1, start_time.weekday() + 1 | |
# Start Filtering | |
if att_weekday != now_weekday: | |
return | |
# print('Passed WeekDat Check') | |
now_hour, att_start_hour, att_end_hour = now.hour, start_time.hour, end_time.hour | |
# att_start_hour, att_end_hour = 18, 21 | |
if now_hour not in range(att_start_hour, att_end_hour + 1): | |
return | |
# print('Passed Hour Check') | |
# now_minute, att_start_minute, att_end_minute = now.minute, start_time.minute, end_time.minute | |
# att_start_minute, att_end_minute = 46, 40 | |
lesson = att_end_hour - att_start_hour | |
return after_channel, lesson, now | |
@commands.Cog.listener() | |
async def on_voice_state_update(self, member, before, after: discord.VoiceState): | |
"""紀錄出缺勤""" | |
data = await self.filter_voice_event(member, before, after) | |
if not data: | |
return | |
after_channel, lesson, now = data | |
# print( | |
# f"MINUTE RANGE ATTENDANCE START: {range(att_start_minute - random.randint(3, 5), att_start_minute + random.randint(3, 5))}") | |
### 這演算法對我這種腦殘來說要想很久 | |
# def valid_minute_range(now_minute, boundary_minute): | |
# return now_minute in range(boundary_minute-random.randint(3, 7), boundary_minute+random.randint(3, 7)) | |
# print(f"{after_channel.members=}") | |
# When user joins channel in learning hours | |
if after_channel and member in after.channel.members: | |
# print('User join channel') | |
# if valid_minute_range(now_minute, att_start_minute): | |
print(f'Join elearn channel : {member}') | |
return await self.record_attendance(member, now, lesson, True) | |
else: | |
# When user leaves channel in learning hours | |
print(f'Left elearn channel : {member}') | |
return await self.record_attendance(member, now, lesson, False) | |
def setup(bot): | |
bot.add_cog(Elearn(bot)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment