Skip to content

Instantly share code, notes, and snippets.

@opabravo
Last active June 12, 2022 05:45
Show Gist options
  • Save opabravo/05de673cc519c469e26d10fcbfba8f46 to your computer and use it in GitHub Desktop.
Save opabravo/05de673cc519c469e26d10fcbfba8f46 to your computer and use it in GitHub Desktop.
A cog of my Multifunctional Discord BOT: "Elearning System on Discord"
"""
A cog of my Multifunctional Discord BOT: "Elearning System on Discord",
I think it's a good practice to prevent SQLIs in python codes,
by wrapping the queries with functions which only accept static type args,
also for explicit permission managing and safe structure...hope so :))
"""
import asyncio
import datetime
import io
import os
import random
import re
import smtplib
from email.header import Header
from email.mime.text import MIMEText
from email.utils import formataddr
from os.path import join, dirname
import asyncpg
import discord
from discord.ext import commands
from dotenv import load_dotenv
from cogs.utils import checks, words, cache
dotenv_path = join(dirname(__file__), '.env')
load_dotenv(dotenv_path)
TYPING = '<a:typing:610062947141812224>'
BULLET = '\N{BULLET} '
GUILDS = [375966733896515584, 888406032127242280, 977764812480217148]
EMAIL_REGEX = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
def color_rnd():
return random.randint(0, 0xffffff)
async def get_or_create_cat(guild: discord.Guild, name: str) -> discord.CategoryChannel:
return discord.utils.find(lambda c: c.name == name, guild.categories) or await guild.create_category(name=name, position=0)
async def get_or_create_role(guild: discord.Guild, name: str, color=0x0000ff) -> discord.Role:
return discord.utils.find(lambda r: r.name == name, guild.roles) or await guild.create_role(name=name, colour=color)
async def cat_create_text_channels(category: discord.CategoryChannel, *channel_names):
for channel in channel_names:
if not discord.utils.find(lambda c: c.name == channel, category.text_channels):
await category.create_text_channel(channel, topic="")
class Elearn(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.email_server = None
self.GUILDS = []
self.bot.loop.create_task(self.db_create_pool())
async def db_create_pool(self):
if not hasattr(self.bot, 'school_pool'):
self.bot.school_pool = await asyncpg.create_pool(
host=os.environ.get("PG_HOST"),
port=os.environ.get("PG_PORT"),
user=os.environ.get("PG_USER"),
password=os.environ.get("PG_PASSWORD"),
database=os.environ.get("PG_DATABASE"),
)
# async def cog_check(self, ctx):
# if ctx.message.guild:
# data = await self.fetch_courses_row(ctx.message.guild.id)
# return data
# # return ctx.message.guild.id in GUILDS
# def cog_unload(self):
# self.bot.school_pool.close()
def send_email(self, recipient: str, code: int):
message = MIMEText(f'<h1 style="color:red;font-size:100px;">{code}<h1>', 'html', 'utf-8')
message['Subject'] = Header('驗證碼 :', "utf-8")
message['From'] = formataddr((str(Header('遠距教學Discord', 'utf-8')), '[email protected]'))
message['To'] = recipient
try:
with smtplib.SMTP_SSL("smtp.zoho.com", 465) as server:
server.login(os.environ.get("MAIL_USER"), os.environ.get("MAIL_PASSWD"))
server.sendmail('[email protected]', recipient, message.as_string())
except Exception as e:
raise
def send_verify_code(self, email: str):
code = random.randint(10000, 99999)
# Debug
self.send_email(email, code)
return code
async def handle_reactions(self, msg: discord.Message, user: discord.Member, timeout: int, *emojis) -> str:
async def add_reactions_task():
for emoji in emojis:
await msg.add_reaction(emoji)
self.bot.loop.create_task(add_reactions_task())
def check(reaction, member):
if str(reaction.emoji) not in emojis:
return
if reaction.message.id == msg.id and member == user:
return True
try:
result = await self.bot.wait_for('reaction_add', timeout=timeout, check=check)
except asyncio.TimeoutError:
return ''
finally:
try:
await msg.clear_reactions()
except discord.errors.NotFound:
return ''
return str(result[0].emoji)
async def handle_response(self, channel: discord.TextChannel, author: discord.Member, timeout: int = 300) -> str:
try:
response = await self.bot.wait_for(
'message', check=lambda m: m.author == author and m.channel == channel, timeout=timeout
)
except asyncio.TimeoutError:
return ''
return response.content
# @staticmethod
# async def give_perms(member: discord.Member):
# await member.remove_roles(*[discord.Object(id=x) for x in ROLES_TO_REMOVE])
# await member.add_roles(*[discord.Object(id=x) for x in ROLES_TO_GIVE])
@staticmethod
async def _init_teacher_role(guild: discord.Guild):
teacher_role = await get_or_create_role(guild, '老師', 0x6a0dad)
await guild.owner.add_roles(teacher_role)
async def _init_channels_and_perms(self, guild: discord.Guild):
if not discord.utils.find(lambda c: c.name == "學生驗證", guild.text_channels):
await guild.create_text_channel(name="學生驗證", position=0)
info_cat = await get_or_create_cat(guild, "▬▬▬ 全域資訊 ▬▬▬")
await cat_create_text_channels(info_cat, "公告", "教學規範", "課程說明")
class_cat = await get_or_create_cat(guild, "▬▬▬ 課程區 ▬▬▬")
await cat_create_text_channels(class_cat, "上課頻道", "討論區")
etc_cat = await get_or_create_cat(guild, "▬▬▬ 其他區 ▬▬▬")
await cat_create_text_channels(etc_cat, "指令頻道")
if not discord.utils.find(lambda c: c.name == "上課語音", class_cat.voice_channels):
await class_cat.create_voice_channel(name="上課語音")
await self._init_perms(class_cat, etc_cat)
@staticmethod
async def _init_perms(*channels):
guild = channels[0].guild
student_role = await get_or_create_role(guild, '學生')
unverified_role = await get_or_create_role(guild, '未驗證', 0xFF6600)
verify_chan = discord.utils.find(lambda c: c.name == "學生驗證", guild.text_channels)
await verify_chan.set_permissions(student_role, read_messages=False)
await verify_chan.set_permissions(unverified_role, read_messages=True)
await verify_chan.set_permissions(guild.default_role, read_messages=True)
info_cat = await get_or_create_cat(guild, "▬▬▬ 全域資訊 ▬▬▬")
await info_cat.set_permissions(guild.default_role, send_messages=False)
for c in channels:
await c.set_permissions(guild.default_role, read_messages=False)
await c.set_permissions(student_role, read_messages=True)
async def _prompt_init(self, ctx):
# await ctx.message.add_reaction('\u2705')
# await ctx.send("已完成初始頻道設置\n請到 #教學規範、#課程說明 張貼您的課程資訊")
msg = await ctx.reply("接下來將請您輸入這門課程的相關資訊")
emoji = await self.handle_reactions(msg, ctx.author, 60, '\N{WHITE HEAVY CHECK MARK}')
if not emoji:
await msg.edit(f'{msg.content}\n:x: (已逾時)')
return
sql_data = {'server_id': ctx.guild.id, 'teacher_id': ctx.author.id, 'school_domain': None,
'start_time': None, 'end_time': None}
questions = {'school_domain': "學生的eamil域名(Ex: `cc.takming.edu.tw`) :",
'course_weekday': "課程開始星期?(格式範例: `1`、`3`、`7`) :",
'start_time': "課程開始?(格式範例: `9:20`、`13:20`) :",
'end_time': "課程結束?(格式範例: `12:10`、`16:10`) :"}
hh_mm_converter = lambda h, m: datetime.datetime.now().replace(hour=int(h), minute=int(m), second=0)
week_day = 0
for q_id, q_str in questions.items():
msg = await ctx.reply(q_str)
resp = await self.handle_response(ctx.channel, ctx.author)
if resp == '':
await msg.edit(f'{msg.content} (已逾時)')
return await msg.add_reaction('\u274C')
if q_id == 'course_weekday':
try:
week_day = int(resp)
continue
except ValueError:
return await msg.reply('格式錯誤!')
except Exception:
return await msg.reply('未知錯誤!')
if 'time' in q_id:
data = resp.split(":")
sql_data[q_id] = hh_mm_converter(data[0], data[1])
continue
sql_data[q_id] = resp
# Dummy algorithm to generate datetime by weekday, bad practice i know, pls dont shit on me
# so that i dont have to refactor code and insert additional columns into db
def gen_date_by_weekday(date: datetime.datetime, weekday: int):
for i in range(1, 20):
modified_date = date.replace(day=i)
if (modified_date.weekday() + 1) == weekday:
return modified_date
for column in ('start_time', 'end_time'):
sql_data[column] = gen_date_by_weekday(sql_data.get(column), week_day)
return sql_data
@checks.is_admin()
@commands.group()
async def elearn(self, ctx):
"""Discord遠距教學系統,使用g!help [指令]來獲取幫助"""
if ctx.invoked_subcommand is None:
await ctx.send_help('elearn')
async def fetch_messangers(self, channel: discord.TextChannel, msg_start: discord.Message,
msg_end: discord.Message):
records = []
async for msg in channel.history(limit=2000, before=msg_end, after=msg_start):
# Skip admins
if msg.author.guild_permissions.manage_guild:
continue
try:
records.append(msg.author.id)
except Exception:
pass
return records
@elearn.command()
async def stg(self, ctx, minutes: int = None):
"""手動點名功能"""
if not minutes:
msg = await ctx.reply("請輸入開放點名的時間(分鐘),介於 `1 ~ 10`")
resp = await self.handle_response(ctx.channel, ctx.author)
if resp == '':
return await msg.edit(f"{msg}\n已逾時..")
try:
minutes = int(resp)
assert 1 <= minutes <= 10
except ValueError:
return await ctx.reply("請輸入數字")
except AssertionError:
return await ctx.reply("請輸入介於**`1 ~ 10`的數字**")
except Exception:
raise
now_min = datetime.datetime.now()
final_min = now_min + datetime.timedelta(minutes=minutes)
title = f"已開始計時點名 : `{now_min.hour}:{now_min.minute}` ~ `{final_min.hour}:{final_min.minute}`"
embed = discord.Embed(colour=0x00ff00, title=title)
msg_start = await ctx.send(embed=embed)
await self.handle_response(ctx.channel, ctx.author, minutes * 60)
embed.title, embed.colour = title.replace('已開始', '已結束'), 0xff0000
msg_end = await ctx.send(embed=embed)
records = await self.fetch_messangers(ctx.channel, msg_start, msg_end)
student_ids = []
for user_id in records:
if r := await self.fetch_students_row(ctx.guild.id, user_id):
student_ids.append(r.get('email').split('@')[0])
replied_students = '\n'.join(student_ids)
embed.description = f'**已點名學生**:\n```fix\n{replied_students}```'
await msg_end.edit(embed=embed)
# await self.record_attendance()
async def sql_table_print_att(self, ctx, date: datetime.datetime):
from cogs.utils.formats import TabularData
results = await self.fetch_attendance_rows(ctx.guild.id, date)
headers = list(results[0].keys())
table = TabularData()
table.set_columns(headers)
table.add_rows(list(r.values()) for r in results)
render = table.render()
fmt = f'```\n{render}\n```'
if len(fmt) > 2000:
fp = io.BytesIO(fmt.encode('utf-8'))
await ctx.send('Too many results...', file=discord.File(fp, 'results.txt'))
else:
await ctx.send(fmt)
@elearn.command()
async def att(self, ctx, time: str = None):
"""根據日期顯示自動點名資訊,直接列sql table
Example: g!elearn stg 2022-5-12"""
if not time:
msg = await ctx.reply("請輸入上課的絕對日期,Ex: `2022-05-12`")
resp = await self.handle_response(ctx.channel, ctx.author)
if resp == '':
return await msg.edit(f"{msg}\n已逾時..")
time = resp
try:
date = datetime.datetime.strptime(time, "%Y-%m-%d")
except ValueError:
return await ctx.reply('日期格式錯誤')
embed = discord.Embed(colour=0x00ff00, title='# TODO: DATA Visualization')
try:
await self.sql_table_print_att(ctx, date)
except IndexError:
embed.title = f'無此紀錄! - `{date}`'
except Exception:
raise
await ctx.reply(embed=embed)
@elearn.command()
async def init(self, ctx: commands.Context):
"""申請並設置遠距教學服務"""
await self._init_teacher_role(ctx.guild)
await self._init_channels_and_perms(ctx.guild)
try:
sql_data = await self._prompt_init(ctx)
except IndexError:
return await ctx.reply("格式錯誤!")
if not sql_data:
return
# today = datetime.datetime.now()
# sql_data = {'server_id': ctx.guild.id, 'teacher_id': ctx.author.id, 'school_domain': '',
# 'start_time': today.replace(hour=9, minute=59, second=0),
# 'end_time': today.replace(hour=12, minute=59, second=0)}
await self.record_courses(sql_data)
await ctx.reply("已成功設定!")
@elearn.command()
async def uninit(self, ctx: commands.Context):
"""註銷遠距教學服務"""
guild = ctx.guild
categories = ["▬▬▬ 全域資訊 ▬▬▬", "▬▬▬ 課程區 ▬▬▬", "▬▬▬ 其他區 ▬▬▬"]
categories = [discord.utils.get(guild.categories, name=x) for x in categories]
if v_c := discord.utils.find(lambda c: c.name == "學生驗證", guild.text_channels):
await v_c.delete()
for cat in categories:
if cat.voice_channels:
await discord.utils.find(lambda c: c.name == "上課語音", cat.voice_channels).delete()
for c in cat.text_channels:
await c.delete()
for cat in categories:
await cat.delete()
await ctx.message.add_reaction('\u2705')
async def record_attendance(self, user: discord.Member, time: datetime.datetime, lesson: int, is_in_vc: bool):
async with self.bot.school_pool.acquire() as conn:
sql = "INSERT INTO attendance ( student_id, server_id, lesson, time, is_in_vc ) VALUES ($1, $2, $3, $4, $5)" \
"RETURNING id"
att_id = await conn.execute(sql, user.id, user.guild.id, lesson, time, is_in_vc)
# self.fetch_attendance_rows.invalidate(self, att_id)
async def record_students(self, channel: discord.TextChannel, user: discord.Member, email: str):
# sourcery skip: assign-if-exp, swap-if-expression
info = await self.fetch_students_row(channel.guild.id, user.id)
async with self.bot.school_pool.acquire() as conn:
if not info:
sql = "INSERT INTO students ( student_id, server_id, email ) VALUES ($1, $2, $3) "
else:
sql = "UPDATE students SET email=$3 WHERE student_id=$1 and server_id=$2"
await conn.execute(sql, user.id, channel.guild.id, email)
# self.fetch_students_row.invalidate(self, server_id=channel.guild.id)
async def record_courses(self, sql_data: dict):
async with self.bot.school_pool.acquire() as conn:
sql = "INSERT INTO courses ( server_id, teacher_id, school_domain, start_time, end_time ) " \
"VALUES ($1, $2, $3, $4, $5) " \
"ON CONFLICT(server_id) " \
"DO UPDATE SET teacher_id=$2, school_domain=$3, start_time=$4, end_time=$5 " \
"RETURNING course_id"
# print(sql)
# print(sql_data.values())
course_id = await conn.fetchval(sql, *sql_data.values())
print(f"{course_id=}")
# self.fetch_courses_row.invalidate(self, server_id=sql_data.get('server_id'))
# @cache.cache()
async def fetch_courses_row(self, server_id: int):
query = """SELECT * FROM courses WHERE server_id=$1;"""
async with self.bot.school_pool.acquire(timeout=300.0) as con:
return await con.fetchrow(query, server_id)
# @cache.cache()
async def fetch_students_row(self, server_id: int, user_id: int = None):
if user_id:
query = """SELECT * FROM students WHERE server_id=$1 and student_id=$2"""
else:
query = """SELECT * FROM students WHERE server_id=$1"""
async with self.bot.school_pool.acquire(timeout=300.0) as con:
return await con.fetchrow(query, server_id, user_id)
# @cache.cache()
async def fetch_attendance_rows(self, server_id: int, date: datetime.datetime):
query = """SELECT student_id, lesson, time, is_in_vc FROM attendance
WHERE server_id=$1 and date(time) = $2"""
async with self.bot.school_pool.acquire(timeout=300.0) as con:
return await con.fetch(query, server_id, date)
# @cache.cache()
# async def fetch_attendance_rows(self, server_id: int):
# query = """SELECT * FROM attendance WHERE server_id=$1;"""
# async with self.bot.school_pool.acquire(timeout=300.0) as con:
# return await con.fetchrow(query, server_id)
@staticmethod
async def _give_perms(user: discord.Member):
student_role = await get_or_create_role(user.guild, '學生')
unverified_role = await get_or_create_role(user.guild, '未驗證', 0xFF6600)
await user.remove_roles(unverified_role)
await user.add_roles(student_role)
async def verify(self, channel: discord.TextChannel, user: discord.Member):
# sourcery skip: assign-if-exp, swap-if-expression
info = await self.fetch_courses_row(channel.guild.id)
valid_email_domain = info['school_domain']
await channel.send(f"{user.mention} 請輸入**學校email**以供驗證 (`XXXXXXXXX@{valid_email_domain}`) {TYPING}")
resp = await self.handle_response(channel, user)
email = resp.lower()
if not resp:
return await channel.send(f"{user.mention} 逾時")
if not re.fullmatch(EMAIL_REGEX, email):
return await channel.send(f"{user.mention} 格式不符!")
if not resp.endswith(f"@{valid_email_domain}"):
return await channel.send(f"{user.mention} email域名不符!")
await channel.send(f"{user.mention}\n已寄驗證碼到此信箱,請檢察\n輸入驗證碼 {TYPING}")
code = self.send_verify_code(email)
# debug
print(code)
resp = await self.handle_response(channel, user)
if resp == '':
return await channel.send(f"{user.mention} 驗證碼輸入逾時")
if str(code) not in resp:
return await channel.send(f"{user.mention} 驗證碼錯誤")
await self.record_students(channel, user, email)
await channel.send(f"`{user.id}` - {user.mention}\n已完成驗證")
try:
await self._give_perms(user)
finally:
embed = words.quote_embed(user)
await user.send(embed=embed)
# data = {"user_id": user.id, "server_id": channel.guild.id, "email": email}
# await channel.send(
# f"{user.mention},您已完成驗證\n將記錄到資料表 - `student`\n```json\n{json.dumps(data, indent=4, ensure_ascii=False)}```"
# )
@commands.Cog.listener()
async def on_message(self, message: discord.Message):
if not message.guild:
return
# Check if teacher have registered elearn
data = await self.fetch_courses_row(message.guild.id)
if not data:
return
channel = message.channel
if channel.name != "學生驗證":
return
if message.content.lower() in ('verify', '驗證'):
await self.verify(channel, message.author)
@commands.Cog.listener()
async def on_member_join(self, member: discord.Member):
if not member.guild:
return
# Check if teacher have registered elearn
data = await self.fetch_courses_row(server_id=member.guild.id)
print(f"{data=}")
if not data:
return
record = await self.fetch_students_row(member.guild.id, member.id)
unverified_role = await get_or_create_role(member.guild, '未驗證', 0xFF6600)
student_role = await get_or_create_role(member.guild, '學生')
if not record:
await member.add_roles(unverified_role)
if verify_chan := discord.utils.find(lambda c: c.name == "學生驗證", member.guild.text_channels):
await self.verify(verify_chan, member)
return
await member.add_roles(student_role)
async def filter_voice_event(self, member, before, after):
if before.channel == after.channel:
return
# Check if teacher have registered elearn
data = await self.fetch_courses_row(member.guild.id)
if not data:
return
before_channel = getattr(before, 'channel', [])
after_channel = getattr(after, 'channel', [])
# if not after.channel:
# return
if after_channel and after_channel.category.name != '▬▬▬ 課程區 ▬▬▬' and after_channel.name != '上課語音':
return
if before_channel and before_channel.category.name != '▬▬▬ 課程區 ▬▬▬' and before_channel.name != '上課語音':
return
# if member not in after.channel.members:
# return
course_record = await self.fetch_courses_row(member.guild.id)
if not course_record:
return
start_time: datetime.datetime = course_record['start_time']
end_time: datetime.datetime = course_record['end_time']
now: datetime.datetime = datetime.datetime.now()
now_weekday, att_weekday = now.weekday() + 1, start_time.weekday() + 1
# Start Filtering
if att_weekday != now_weekday:
return
# print('Passed WeekDat Check')
now_hour, att_start_hour, att_end_hour = now.hour, start_time.hour, end_time.hour
# att_start_hour, att_end_hour = 18, 21
if now_hour not in range(att_start_hour, att_end_hour + 1):
return
# print('Passed Hour Check')
# now_minute, att_start_minute, att_end_minute = now.minute, start_time.minute, end_time.minute
# att_start_minute, att_end_minute = 46, 40
lesson = att_end_hour - att_start_hour
return after_channel, lesson, now
@commands.Cog.listener()
async def on_voice_state_update(self, member, before, after: discord.VoiceState):
"""紀錄出缺勤"""
data = await self.filter_voice_event(member, before, after)
if not data:
return
after_channel, lesson, now = data
# print(
# f"MINUTE RANGE ATTENDANCE START: {range(att_start_minute - random.randint(3, 5), att_start_minute + random.randint(3, 5))}")
### 這演算法對我這種腦殘來說要想很久
# def valid_minute_range(now_minute, boundary_minute):
# return now_minute in range(boundary_minute-random.randint(3, 7), boundary_minute+random.randint(3, 7))
# print(f"{after_channel.members=}")
# When user joins channel in learning hours
if after_channel and member in after.channel.members:
# print('User join channel')
# if valid_minute_range(now_minute, att_start_minute):
print(f'Join elearn channel : {member}')
return await self.record_attendance(member, now, lesson, True)
else:
# When user leaves channel in learning hours
print(f'Left elearn channel : {member}')
return await self.record_attendance(member, now, lesson, False)
def setup(bot):
bot.add_cog(Elearn(bot))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment