Created
February 9, 2025 15:17
-
-
Save PikalaxALT/5a774ca636a5ebfccfaefdf7e1a23353 to your computer and use it in GitHub Desktop.
Concept for a simple moderation bot
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import contextlib | |
import datetime | |
import asyncpg | |
import asyncpg.transaction | |
import discord | |
import dotenv | |
from discord import app_commands | |
from discord.ext.commands import Bot | |
dotenv.load_dotenv() | |
STRIKE_LOG_CHANNEL = 1337633142487519314 # pret/strike-log | |
WARNLEVEL_EXPIRES_AFTER = datetime.timedelta(days=7) | |
TIMEOUT_DURATIONS = [ | |
datetime.timedelta(minutes=1), | |
datetime.timedelta(minutes=5), | |
datetime.timedelta(minutes=10), | |
datetime.timedelta(hours=1), | |
datetime.timedelta(days=1), | |
datetime.timedelta(days=7), | |
] | |
class MessageLock(asyncio.Lock): | |
"""To prevent two moderators from trying to strike the same message at the same time""" | |
def __init__(self): | |
self.owner: discord.User = None | |
class MyBot(Bot): | |
"""The discord bot""" | |
locked_messages: dict[discord.Message, MessageLock] | |
locked_messages_mutex: asyncio.Lock | |
async def setup_hook(self): | |
self.locked_messages_mutex = asyncio.Lock() | |
self.locked_messages = {} | |
self.db_pool = await asyncpg.create_pool(database="strikebot", user="strikebot") | |
async with self.db_pool.acquire() as con: | |
con: asyncpg.Connection | |
async with con.transaction(): | |
# Manage the timeouts and warn levels | |
await con.execute( | |
"CREATE TABLE IF NOT EXISTS pret_strike_users_v1 (" | |
"id INT AUTOINCREMENT NOT NULL PRIMARY KEY, " | |
"user_id BIGINT UNIQUE NOT NULL, " | |
"last_timeout TIMESTAMP NOT NULL, " | |
"warn_expiry TIMESTAMP, " | |
"warn_level INT NOT NULL DEFAULT 0" | |
");" | |
) | |
# Scoreboard for the moderators | |
await con.execute( | |
"CREATE TABLE IF NOT EXISTS pret_strike_moderators_v1 (" | |
"id INT AUTOINCREMENT NOT NULL PRIMARY KEY, " | |
"user_id BIGINT NOT NULL, " | |
"action_count INT NOT NULL DEFAULT 0" | |
");" | |
) | |
# Join table | |
await con.execute( | |
"CREATE TABLE IF NOT EXISTS pret_strike_messages_v1 (" | |
"id INT AUTOINCREMENT NOT NULL PRIMARY KEY, " | |
"message_id BIGINT NOT NULL, " | |
"striker_id INT REFERENCES pret_strike_moderators_v1.id, " | |
"author_id INT REFERENCES pret_strike_users_v1.id, " | |
"reason TEXT" | |
");" | |
) | |
async def close(self): | |
try: | |
await super().close() | |
finally: | |
await self.db_pool.close() | |
# synchronization primitives | |
async def try_lock_message(self, message: discord.Message, user: discord.User): | |
async with self.locked_messages_mutex: | |
lock = self.locked_messages.get(message) | |
if not lock.locked(): | |
await lock.acquire() | |
lock.owner = user | |
return lock | |
async def lock_message(self, message: discord.Message, user: discord.User): | |
async with self.locked_messages_mutex: | |
lock = self.locked_messages.get(message) | |
await lock.acquire() | |
lock.owner = user | |
return lock | |
async def release_message(self, message: discord.Message): | |
async with self.locked_messages_mutex: | |
self.locked_messages.get(message).release() | |
@contextlib.asynccontextmanager | |
async def message_as_locked(self, message: discord.Message, user: discord.User): | |
lock = await self.lock_message(message, user) | |
yield lock | |
await self.release_message(message) | |
@contextlib.asynccontextmanager | |
async def try_message_as_locked(self, message: discord.Message, user: discord.User): | |
lock = await self.try_lock_message(message, user) | |
yield lock | |
await self.release_message(message) | |
bot = MyBot(intents=discord.Intents.default()) | |
@bot.tree.command() | |
@app_commands.guild_install() | |
@app_commands.has_permissions(moderate_members=True) | |
async def strike(interaction: discord.Interaction, message: discord.Message): | |
"""Issues a warning strike for the given message""" | |
async with bot.try_message_as_locked(message, interaction.user) as lock: | |
if lock.owner != interaction.user: | |
await interaction.response.send_message( | |
f"This message is already being actioned by {lock.owner.mention}", | |
ephemeral=True, | |
) | |
return | |
if message.author.timed_out_until is not None: | |
await interaction.response.send_message( | |
"This user is already timed out", ephemeral=True | |
) | |
return | |
# Open a dialog to capture the reason for the strike | |
modal = discord.ui.Modal().add_item( | |
discord.ui.TextInput(label="Reason", required=True, max_length=200) | |
) | |
await interaction.response.send_modal(modal) | |
if await modal.wait(): | |
return | |
reason = modal.children[0].value | |
if not reason: # cancel button | |
return | |
async with bot.db_pool.acquire() as con: | |
con: asyncpg.Connection | |
async with con.transaction(): | |
record: asyncpg.Record = await con.fetchrow( | |
"INSERT INTO pret_strike_users_v1 (user_id, last_timeout, warn_level) VALUES ($1, NOW(), 0) " | |
"ON CONFLICT (user_id) DO UPDATE SET " | |
"last_timeout = NOW(), " | |
"warn_level = (CASE WHEN (NOW() >= warn_expiry) THEN 0 ELSE LEAST(warn_level + 1, $2)) " | |
"RETURNING id, last_timeout, warn_level;", | |
message.author.id, | |
len(TIMEOUT_DURATIONS) - 1, | |
) | |
row_id = record["id"] | |
now = datetime.datetime.fromtimestamp(record["last_timeout"]) | |
warn_level: int = record["warn_level"] | |
duration = TIMEOUT_DURATIONS[warn_level] | |
expiry: datetime.datetime = now + duration | |
await message.author.timeout( | |
expiry, reason=reason | |
) # if this fails, rollback the transaction | |
await con.execute( | |
"UPDATE pret_strike_users_v1 SET warn_expiry = $2 WHERE user_id = $1;", | |
message.author.id, | |
expiry + WARNLEVEL_EXPIRES_AFTER, | |
) | |
mod_id = await con.fetchval( | |
"INSERT INTO pret_strike_moderators_v1 (user_id, action_count) VALUES ($1, 1) " | |
"ON CONFLICT (user_id) DO UPDATE SET " | |
"action_count = action_count + 1 " | |
"RETURNING id;", | |
interaction.user.id, | |
) | |
await con.execute( | |
"INSERT INTO pret_strike_messages_v1 (message_id, striker_id, author_id, reason) VALUES ($1, $2, $3, $4);", | |
message.id, | |
mod_id, | |
row_id, | |
reason, | |
) | |
await bot.get_channel(STRIKE_LOG_CHANNEL).send( | |
f"{interaction.user.mention} timed out {message.author.mention} for reason: {reason}\n" | |
"Expires: <t:{int(expiry.timestamp())}:R>\n" | |
"Message: {interaction.message.jump_url}" | |
) | |
def main(): | |
token = dotenv.get_key("DISCORD_TOKEN") | |
bot.run(token) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment