Skip to content

Instantly share code, notes, and snippets.

@Mikubill
Created February 19, 2024 10:12
Show Gist options
  • Save Mikubill/8653882f002f3324ba14d5cf154b3e01 to your computer and use it in GitHub Desktop.
Save Mikubill/8653882f002f3324ba14d5cf154b3e01 to your computer and use it in GitHub Desktop.
kimi-discord-bot.py
import discord
import asyncio
import logging
import logging.handlers
from openai import OpenAI
from discord import app_commands
from discord import Message
KIMITOKEN = ""
DISCORD_BOT_TOKEN = ""
aiclient = OpenAI(
api_key=KIMITOKEN,
base_url="https://api.moonshot.cn/v1",
)
INIT_SYS_PROMPT = "你是 Kimi,由 Moonshot AI 提供的人工智能助手,你更擅长中文和英文的对话。\
你会为用户提供安全,有帮助,准确的回答。同时,你会拒绝一些涉及恐怖主义,种族歧视,黄色暴力等问题的回答。\
Moonshot AI 为专有名词,不可翻译成其他语言。"
class CustomFormatter(logging.Formatter):
LEVEL_COLORS = [
(logging.DEBUG, "\x1b[40;1m"),
(logging.INFO, "\x1b[34;1m"),
(logging.WARNING, "\x1b[33;1m"),
(logging.ERROR, "\x1b[31m"),
(logging.CRITICAL, "\x1b[41m"),
]
FORMATS = {
level: logging.Formatter(
f"\x1b[30;1m%(asctime)s\x1b[0m {color}%(levelname)-8s\x1b[0m \x1b[35m%(name)s\x1b[0m -> %(message)s",
"%Y-%m-%d %H:%M:%S",
)
for level, color in LEVEL_COLORS
}
def format(self, record):
formatter = self.FORMATS.get(record.levelno)
if formatter is None:
formatter = self.FORMATS[logging.DEBUG]
# Override the traceback to always print in red
if record.exc_info:
text = formatter.formatException(record.exc_info)
record.exc_text = f"\x1b[31m{text}\x1b[0m"
output = formatter.format(record)
# Remove the cache layer
record.exc_text = None
return output
def setup_logger(module_name: str) -> logging.Logger:
# create logger
library, _, _ = module_name.partition(".py")
logger = logging.getLogger(library)
logger.setLevel(logging.INFO)
log_level = "INFO"
level = logging.getLevelName(log_level.upper())
# create console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(level)
console_handler.setFormatter(CustomFormatter())
# Add console handler to logger
logger.addHandler(console_handler)
return logger
class dcclient(discord.Client):
def __init__(self) -> None:
intents = discord.Intents.default()
intents.message_content = True
super().__init__(intents=intents)
self.tree = app_commands.CommandTree(self)
self.current_channel = None
self.activity = discord.Activity(type=discord.ActivityType.listening, name="/chat")
self.message_queue = asyncio.Queue()
async def process_messages(self):
while True:
if self.current_channel is not None:
while not self.message_queue.empty():
async with self.current_channel.typing():
message, user_message = await self.message_queue.get()
try:
await self.send_message(message, user_message)
except Exception as e:
logger.exception(f"Error while processing message: {e}")
finally:
self.message_queue.task_done()
await asyncio.sleep(1)
async def enqueue_message(self, message, user_message):
await self.message_queue.put((message, user_message))
async def send_message(self, message, user_message):
author = message.author.id if hasattr(message, "author") else message.channel.id
try:
custom_msg = None
if isinstance(user_message, tuple):
user_message, custom_msg = user_message
response = f"> **{user_message}** - {custom_msg}\n\n"
else:
response = f"> **{user_message}** \n\n"
msg_to_send = [
{
"role": "system",
"content": INIT_SYS_PROMPT,
},
]
if isinstance(user_message, discord.Attachment):
file_object = aiclient.files.create(file=await user_message.read(), purpose='file-extract')
logger.info(file_object.id)
file_content = aiclient.files.content(file_id=file_object.id).text
msg_to_send.append(
{
"role": "system",
"content": file_content,
}
)
msg_to_send.append(
{
"role": "user",
"content": "Please briefly introduce what this document talks about" if custom_msg is None else custom_msg,
},
)
user_message = "<file-{}>".format(file_object.id)
else:
msg_to_send.append(
{"role": "user", "content": user_message},
)
completion = aiclient.chat.completions.create(
model="moonshot-v1-32k",
messages=msg_to_send,
temperature=0.3,
stream=True
)
handle = message.followup if hasattr(message, "followup") else message.channel
msg = await handle.send(response)
diff = ""
for chunk in completion:
c = chunk.choices[0].delta.content
if not isinstance(c, str):
continue
diff += c
if len(diff) > 10:
response = response + diff.strip()
await msg.edit(content=response)
diff = ""
response = response + diff.strip()
await msg.edit(content=response)
except Exception as e:
logger.exception(f"Error while sending : {e}")
await message.followup.send(
f"> **ERROR: Something went wrong, please try again later!** \n ```ERROR MESSAGE: {e}```"
)
def run_discord_bot():
@client.event
async def on_ready():
await client.tree.sync()
loop = asyncio.get_event_loop()
loop.create_task(client.process_messages())
logger.info(f"{client.user} is now running!")
@client.tree.command(name="chat", description="Have a chat with Kimi!")
async def chat(interaction: discord.Interaction, *, message: str):
if interaction.user == client.user:
return
username = str(interaction.user)
client.current_channel = interaction.channel
logger.info(f"\x1b[31m{username}\x1b[0m : /chat [{message}] in ({client.current_channel})")
await client.enqueue_message(interaction, message)
await interaction.response.defer(thinking=True)
@client.tree.command(name="chat_file", description="Have a file chat with Kimi!")
async def chat_file(interaction: discord.Interaction, *, file: discord.Attachment):
if interaction.user == client.user:
return
username = str(interaction.user)
client.current_channel = interaction.channel
logger.info(f"\x1b[31m{username}\x1b[0m : /chat_file [{file.content_type}] in ({client.current_channel})")
await client.enqueue_message(interaction, file)
await interaction.response.defer(thinking=True)
@client.tree.command(name="chat_file_custom", description="Have a file chat with custom message with Kimi!")
async def chat_file(interaction: discord.Interaction, *, file: discord.Attachment, message: str):
if interaction.user == client.user:
return
username = str(interaction.user)
client.current_channel = interaction.channel
logger.info(f"\x1b[31m{username}\x1b[0m : /chat_file [{file.content_type}] in ({client.current_channel})")
await client.enqueue_message(interaction,(file, message))
await interaction.response.defer(thinking=True)
client.run(DISCORD_BOT_TOKEN)
logger = setup_logger(__name__)
client = dcclient()
if __name__ == "__main__":
run_discord_bot()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment