Created
February 19, 2024 10:12
-
-
Save Mikubill/8653882f002f3324ba14d5cf154b3e01 to your computer and use it in GitHub Desktop.
kimi-discord-bot.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import discord | |
import asyncio | |
import logging | |
import logging.handlers | |
from openai import OpenAI | |
from discord import app_commands | |
from discord import Message | |
KIMITOKEN = "" | |
DISCORD_BOT_TOKEN = "" | |
aiclient = OpenAI( | |
api_key=KIMITOKEN, | |
base_url="https://api.moonshot.cn/v1", | |
) | |
INIT_SYS_PROMPT = "你是 Kimi,由 Moonshot AI 提供的人工智能助手,你更擅长中文和英文的对话。\ | |
你会为用户提供安全,有帮助,准确的回答。同时,你会拒绝一些涉及恐怖主义,种族歧视,黄色暴力等问题的回答。\ | |
Moonshot AI 为专有名词,不可翻译成其他语言。" | |
class CustomFormatter(logging.Formatter): | |
LEVEL_COLORS = [ | |
(logging.DEBUG, "\x1b[40;1m"), | |
(logging.INFO, "\x1b[34;1m"), | |
(logging.WARNING, "\x1b[33;1m"), | |
(logging.ERROR, "\x1b[31m"), | |
(logging.CRITICAL, "\x1b[41m"), | |
] | |
FORMATS = { | |
level: logging.Formatter( | |
f"\x1b[30;1m%(asctime)s\x1b[0m {color}%(levelname)-8s\x1b[0m \x1b[35m%(name)s\x1b[0m -> %(message)s", | |
"%Y-%m-%d %H:%M:%S", | |
) | |
for level, color in LEVEL_COLORS | |
} | |
def format(self, record): | |
formatter = self.FORMATS.get(record.levelno) | |
if formatter is None: | |
formatter = self.FORMATS[logging.DEBUG] | |
# Override the traceback to always print in red | |
if record.exc_info: | |
text = formatter.formatException(record.exc_info) | |
record.exc_text = f"\x1b[31m{text}\x1b[0m" | |
output = formatter.format(record) | |
# Remove the cache layer | |
record.exc_text = None | |
return output | |
def setup_logger(module_name: str) -> logging.Logger: | |
# create logger | |
library, _, _ = module_name.partition(".py") | |
logger = logging.getLogger(library) | |
logger.setLevel(logging.INFO) | |
log_level = "INFO" | |
level = logging.getLevelName(log_level.upper()) | |
# create console handler | |
console_handler = logging.StreamHandler() | |
console_handler.setLevel(level) | |
console_handler.setFormatter(CustomFormatter()) | |
# Add console handler to logger | |
logger.addHandler(console_handler) | |
return logger | |
class dcclient(discord.Client): | |
def __init__(self) -> None: | |
intents = discord.Intents.default() | |
intents.message_content = True | |
super().__init__(intents=intents) | |
self.tree = app_commands.CommandTree(self) | |
self.current_channel = None | |
self.activity = discord.Activity(type=discord.ActivityType.listening, name="/chat") | |
self.message_queue = asyncio.Queue() | |
async def process_messages(self): | |
while True: | |
if self.current_channel is not None: | |
while not self.message_queue.empty(): | |
async with self.current_channel.typing(): | |
message, user_message = await self.message_queue.get() | |
try: | |
await self.send_message(message, user_message) | |
except Exception as e: | |
logger.exception(f"Error while processing message: {e}") | |
finally: | |
self.message_queue.task_done() | |
await asyncio.sleep(1) | |
async def enqueue_message(self, message, user_message): | |
await self.message_queue.put((message, user_message)) | |
async def send_message(self, message, user_message): | |
author = message.author.id if hasattr(message, "author") else message.channel.id | |
try: | |
custom_msg = None | |
if isinstance(user_message, tuple): | |
user_message, custom_msg = user_message | |
response = f"> **{user_message}** - {custom_msg}\n\n" | |
else: | |
response = f"> **{user_message}** \n\n" | |
msg_to_send = [ | |
{ | |
"role": "system", | |
"content": INIT_SYS_PROMPT, | |
}, | |
] | |
if isinstance(user_message, discord.Attachment): | |
file_object = aiclient.files.create(file=await user_message.read(), purpose='file-extract') | |
logger.info(file_object.id) | |
file_content = aiclient.files.content(file_id=file_object.id).text | |
msg_to_send.append( | |
{ | |
"role": "system", | |
"content": file_content, | |
} | |
) | |
msg_to_send.append( | |
{ | |
"role": "user", | |
"content": "Please briefly introduce what this document talks about" if custom_msg is None else custom_msg, | |
}, | |
) | |
user_message = "<file-{}>".format(file_object.id) | |
else: | |
msg_to_send.append( | |
{"role": "user", "content": user_message}, | |
) | |
completion = aiclient.chat.completions.create( | |
model="moonshot-v1-32k", | |
messages=msg_to_send, | |
temperature=0.3, | |
stream=True | |
) | |
handle = message.followup if hasattr(message, "followup") else message.channel | |
msg = await handle.send(response) | |
diff = "" | |
for chunk in completion: | |
c = chunk.choices[0].delta.content | |
if not isinstance(c, str): | |
continue | |
diff += c | |
if len(diff) > 10: | |
response = response + diff.strip() | |
await msg.edit(content=response) | |
diff = "" | |
response = response + diff.strip() | |
await msg.edit(content=response) | |
except Exception as e: | |
logger.exception(f"Error while sending : {e}") | |
await message.followup.send( | |
f"> **ERROR: Something went wrong, please try again later!** \n ```ERROR MESSAGE: {e}```" | |
) | |
def run_discord_bot(): | |
@client.event | |
async def on_ready(): | |
await client.tree.sync() | |
loop = asyncio.get_event_loop() | |
loop.create_task(client.process_messages()) | |
logger.info(f"{client.user} is now running!") | |
@client.tree.command(name="chat", description="Have a chat with Kimi!") | |
async def chat(interaction: discord.Interaction, *, message: str): | |
if interaction.user == client.user: | |
return | |
username = str(interaction.user) | |
client.current_channel = interaction.channel | |
logger.info(f"\x1b[31m{username}\x1b[0m : /chat [{message}] in ({client.current_channel})") | |
await client.enqueue_message(interaction, message) | |
await interaction.response.defer(thinking=True) | |
@client.tree.command(name="chat_file", description="Have a file chat with Kimi!") | |
async def chat_file(interaction: discord.Interaction, *, file: discord.Attachment): | |
if interaction.user == client.user: | |
return | |
username = str(interaction.user) | |
client.current_channel = interaction.channel | |
logger.info(f"\x1b[31m{username}\x1b[0m : /chat_file [{file.content_type}] in ({client.current_channel})") | |
await client.enqueue_message(interaction, file) | |
await interaction.response.defer(thinking=True) | |
@client.tree.command(name="chat_file_custom", description="Have a file chat with custom message with Kimi!") | |
async def chat_file(interaction: discord.Interaction, *, file: discord.Attachment, message: str): | |
if interaction.user == client.user: | |
return | |
username = str(interaction.user) | |
client.current_channel = interaction.channel | |
logger.info(f"\x1b[31m{username}\x1b[0m : /chat_file [{file.content_type}] in ({client.current_channel})") | |
await client.enqueue_message(interaction,(file, message)) | |
await interaction.response.defer(thinking=True) | |
client.run(DISCORD_BOT_TOKEN) | |
logger = setup_logger(__name__) | |
client = dcclient() | |
if __name__ == "__main__": | |
run_discord_bot() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment