Skip to content

Instantly share code, notes, and snippets.

@PikalaxALT
Created February 9, 2025 15:17
Show Gist options
  • Save PikalaxALT/5a774ca636a5ebfccfaefdf7e1a23353 to your computer and use it in GitHub Desktop.
Save PikalaxALT/5a774ca636a5ebfccfaefdf7e1a23353 to your computer and use it in GitHub Desktop.
Concept for a simple moderation bot
import asyncio
import contextlib
import datetime
import asyncpg
import asyncpg.transaction
import discord
import dotenv
from discord import app_commands
from discord.ext.commands import Bot
dotenv.load_dotenv()
STRIKE_LOG_CHANNEL = 1337633142487519314 # pret/strike-log
WARNLEVEL_EXPIRES_AFTER = datetime.timedelta(days=7)
TIMEOUT_DURATIONS = [
datetime.timedelta(minutes=1),
datetime.timedelta(minutes=5),
datetime.timedelta(minutes=10),
datetime.timedelta(hours=1),
datetime.timedelta(days=1),
datetime.timedelta(days=7),
]
class MessageLock(asyncio.Lock):
"""To prevent two moderators from trying to strike the same message at the same time"""
def __init__(self):
self.owner: discord.User = None
class MyBot(Bot):
"""The discord bot"""
locked_messages: dict[discord.Message, MessageLock]
locked_messages_mutex: asyncio.Lock
async def setup_hook(self):
self.locked_messages_mutex = asyncio.Lock()
self.locked_messages = {}
self.db_pool = await asyncpg.create_pool(database="strikebot", user="strikebot")
async with self.db_pool.acquire() as con:
con: asyncpg.Connection
async with con.transaction():
# Manage the timeouts and warn levels
await con.execute(
"CREATE TABLE IF NOT EXISTS pret_strike_users_v1 ("
"id INT AUTOINCREMENT NOT NULL PRIMARY KEY, "
"user_id BIGINT UNIQUE NOT NULL, "
"last_timeout TIMESTAMP NOT NULL, "
"warn_expiry TIMESTAMP, "
"warn_level INT NOT NULL DEFAULT 0"
");"
)
# Scoreboard for the moderators
await con.execute(
"CREATE TABLE IF NOT EXISTS pret_strike_moderators_v1 ("
"id INT AUTOINCREMENT NOT NULL PRIMARY KEY, "
"user_id BIGINT NOT NULL, "
"action_count INT NOT NULL DEFAULT 0"
");"
)
# Join table
await con.execute(
"CREATE TABLE IF NOT EXISTS pret_strike_messages_v1 ("
"id INT AUTOINCREMENT NOT NULL PRIMARY KEY, "
"message_id BIGINT NOT NULL, "
"striker_id INT REFERENCES pret_strike_moderators_v1.id, "
"author_id INT REFERENCES pret_strike_users_v1.id, "
"reason TEXT"
");"
)
async def close(self):
try:
await super().close()
finally:
await self.db_pool.close()
# synchronization primitives
async def try_lock_message(self, message: discord.Message, user: discord.User):
async with self.locked_messages_mutex:
lock = self.locked_messages.get(message)
if not lock.locked():
await lock.acquire()
lock.owner = user
return lock
async def lock_message(self, message: discord.Message, user: discord.User):
async with self.locked_messages_mutex:
lock = self.locked_messages.get(message)
await lock.acquire()
lock.owner = user
return lock
async def release_message(self, message: discord.Message):
async with self.locked_messages_mutex:
self.locked_messages.get(message).release()
@contextlib.asynccontextmanager
async def message_as_locked(self, message: discord.Message, user: discord.User):
lock = await self.lock_message(message, user)
yield lock
await self.release_message(message)
@contextlib.asynccontextmanager
async def try_message_as_locked(self, message: discord.Message, user: discord.User):
lock = await self.try_lock_message(message, user)
yield lock
await self.release_message(message)
bot = MyBot(intents=discord.Intents.default())
@bot.tree.command()
@app_commands.guild_install()
@app_commands.has_permissions(moderate_members=True)
async def strike(interaction: discord.Interaction, message: discord.Message):
"""Issues a warning strike for the given message"""
async with bot.try_message_as_locked(message, interaction.user) as lock:
if lock.owner != interaction.user:
await interaction.response.send_message(
f"This message is already being actioned by {lock.owner.mention}",
ephemeral=True,
)
return
if message.author.timed_out_until is not None:
await interaction.response.send_message(
"This user is already timed out", ephemeral=True
)
return
# Open a dialog to capture the reason for the strike
modal = discord.ui.Modal().add_item(
discord.ui.TextInput(label="Reason", required=True, max_length=200)
)
await interaction.response.send_modal(modal)
if await modal.wait():
return
reason = modal.children[0].value
if not reason: # cancel button
return
async with bot.db_pool.acquire() as con:
con: asyncpg.Connection
async with con.transaction():
record: asyncpg.Record = await con.fetchrow(
"INSERT INTO pret_strike_users_v1 (user_id, last_timeout, warn_level) VALUES ($1, NOW(), 0) "
"ON CONFLICT (user_id) DO UPDATE SET "
"last_timeout = NOW(), "
"warn_level = (CASE WHEN (NOW() >= warn_expiry) THEN 0 ELSE LEAST(warn_level + 1, $2)) "
"RETURNING id, last_timeout, warn_level;",
message.author.id,
len(TIMEOUT_DURATIONS) - 1,
)
row_id = record["id"]
now = datetime.datetime.fromtimestamp(record["last_timeout"])
warn_level: int = record["warn_level"]
duration = TIMEOUT_DURATIONS[warn_level]
expiry: datetime.datetime = now + duration
await message.author.timeout(
expiry, reason=reason
) # if this fails, rollback the transaction
await con.execute(
"UPDATE pret_strike_users_v1 SET warn_expiry = $2 WHERE user_id = $1;",
message.author.id,
expiry + WARNLEVEL_EXPIRES_AFTER,
)
mod_id = await con.fetchval(
"INSERT INTO pret_strike_moderators_v1 (user_id, action_count) VALUES ($1, 1) "
"ON CONFLICT (user_id) DO UPDATE SET "
"action_count = action_count + 1 "
"RETURNING id;",
interaction.user.id,
)
await con.execute(
"INSERT INTO pret_strike_messages_v1 (message_id, striker_id, author_id, reason) VALUES ($1, $2, $3, $4);",
message.id,
mod_id,
row_id,
reason,
)
await bot.get_channel(STRIKE_LOG_CHANNEL).send(
f"{interaction.user.mention} timed out {message.author.mention} for reason: {reason}\n"
"Expires: <t:{int(expiry.timestamp())}:R>\n"
"Message: {interaction.message.jump_url}"
)
def main():
token = dotenv.get_key("DISCORD_TOKEN")
bot.run(token)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment