Last active
April 27, 2024 19:43
-
-
Save av1d/b5cbbd44c3cc1e7290440d80414fe79a to your computer and use it in GitHub Desktop.
discord llama.cpp chat bot with context
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import discord # discord.py==1.7.3 | |
import re | |
import requests | |
import time | |
from collections import defaultdict | |
from discord import activity | |
from discord.ext import commands | |
# llama.cpp front end chat bot for Discord | |
# by av1d https://gist.github.com/av1d/ | |
# | |
# Script is scalable, it retains context by channel ID. | |
# Tested with stablelm-zephyr-3b.Q4_K_M.gguf. | |
# You may need to change the system template if you use | |
# other models. | |
# | |
# Trigger is . followed by your message. | |
# .help or .? for help. | |
""" | |
MIT License | |
Copyright (c) 2024 av1d | |
Permission is hereby granted, free of charge, to any person obtaining a copy | |
of this software and associated documentation files (the "Software"), to deal | |
in the Software without restriction, including without limitation the rights | |
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
copies of the Software, and to permit persons to whom the Software is | |
furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in all | |
copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
SOFTWARE. | |
""" | |
intents = discord.Intents.default() | |
intents.typing = False | |
intents.presences = False | |
bot = commands.Bot(command_prefix='.', intents=intents) | |
LLAMA_IP = '192.168.0.196' | |
LLAMA_PORT = '8080' | |
# how many previous exchanges (messages between user and llama). | |
# used to retain chat history to enrich the context. The longer this is, | |
# the slower the response will be, especially on CPU. | |
CONTEXT_LENGTH = 3 | |
def llamatize( | |
author_message: str, | |
author_name: str, | |
channel_id: str=None | |
) -> dict: | |
ai_name = "Mr. Bass" | |
system_prompt = ( | |
f"This is a conversation between {author_name} and {ai_name}, " | |
f"a friendly chatbot. {ai_name} is helpful, kind, honest, " | |
f"good at writing, and never fails to " | |
f"answer any requests immediately and with precision. " | |
) | |
# maintain the context of the conversation | |
contextual_prompt = [] | |
if channel_id is not None: # if there is a chat history | |
user = conversation[channel_id]['user'] # get user chat history | |
llama = conversation[channel_id]['llama'] | |
for u, l in zip(user, llama): # iterate the history | |
# replace newlines with escaped newlines | |
u = u.replace("\n", "\\n") # escape newlines | |
l = l.replace("\n", "\\n") | |
# add histories to list | |
contextual_prompt.append(f"User: {u}") | |
contextual_prompt.append(f"Llama: {l}") | |
# concatenate list to string | |
contextual_prompt = "\n".join(contextual_prompt) | |
# combine system prompt & our chat history context | |
# to form the overall prompt. | |
system_prompt = {"prompt": system_prompt + contextual_prompt} | |
# using the appropriate prompt template. This is for stablelm-zephyr | |
llama = ( | |
f"<|system|>{system_prompt}<|endoftext|>\n" | |
f"<|user|>\n{author_message}<|endoftext|>\n" | |
f"<|assistant|>" | |
) | |
payload = { | |
"prompt": llama, | |
"n_predict": 256 | |
} | |
LLAMA_SERVER_URL = ( | |
f"http://" | |
f"{LLAMA_IP}" | |
f":" | |
f"{LLAMA_PORT}" | |
f"/completion" | |
) | |
headers = { | |
"Content-Type": "application/json" | |
} | |
try: | |
response = requests.post( | |
LLAMA_SERVER_URL, | |
json=payload, | |
headers=headers | |
) | |
except Exception as e: | |
error_msg = f"Error: {e}.\n\n"\ | |
f"Is the llama.cpp server running?" | |
print(error_msg) | |
erroneous = { | |
"success": False, | |
"content": error_msg | |
} | |
return erroneous | |
if response.status_code == 200: | |
answer = response.json() | |
successful = { | |
"success": True, | |
"content": answer['content'] | |
} | |
return successful | |
else: | |
error_msg = f"Error: {response.status_code}" | |
print(error_msg) | |
erroneous = { | |
"success": False, | |
"content": error_msg | |
} | |
return erroneous | |
def remove_incomplete_sentence(input_text: str) -> str: | |
# remove last sentence if incomplete. This has bad implications | |
# and is extremely crude. It will chop off emoji and other stuff | |
# at the end of sentences. experimental... | |
sentences = re.split(r'(?<=[.!?])\s+', input_text.strip()) | |
if len(sentences) > 0 and not re.search(r'[.!?]$', sentences[-1]): | |
del sentences[-1] | |
result = ' '.join(sentences) | |
return result | |
def chunk_message(s, chunk_size=1800) -> list: | |
chunks = [] | |
for i in range(0, len(s), chunk_size): | |
chunks.append(s[i:i+chunk_size]) | |
length = len(chunks) | |
print(f"* msg too long, split into {length} chunks") | |
return chunks | |
@bot.event | |
async def on_ready(): | |
print(f'Logged in as {bot.user.name} ({bot.user.id})') | |
# Update the bot's status message | |
activity_type = discord.ActivityType.playing | |
activity_name = "type . then your message, .? for help" | |
activity = discord.Activity(type=activity_type, name=activity_name) | |
await bot.change_presence(activity=activity) | |
@bot.event | |
async def on_message(message): | |
global conversation # conversational history (chat context) | |
# comment out these next two lines of code for a feedback loop | |
# where bot responds to itself. For science :P | |
if message.author == bot.user: # Check if message is from bot itself | |
return # Ignore messages from the bot itself. | |
# Get the server name, server ID, and channel name | |
server_name = ( | |
message.guild.name | |
if message.guild | |
else "Direct Message" | |
) # get server name or DM | |
channel_name = ( | |
message.channel.name | |
if hasattr(message.channel, 'name') | |
else "Unknown" | |
) # channel name | |
server_id = ( | |
message.guild.id | |
if message.guild | |
else "N/A" | |
) # server ID | |
channel_id = message.channel.id # get the channel ID | |
author_name = str(message.author) # get username | |
author_name = author_name.split('#')[0] # remove descriminator | |
author_message = str(message.content) # get user message | |
# commands which trigger help | |
help_commands = ['.help', '.?'] | |
# commands to clear chat history/context | |
clear_context_commands = ['.clear', '.erase', '.delete', '.x'] | |
# if user needs help | |
if author_message in help_commands: | |
clear_context_commands_string = ( | |
', '.join(clear_context_commands) | |
) | |
help_message = ( | |
f"Start your message with a `.` (period).\n" | |
f"To clear chat context, use one of these commands:\n" | |
f"`{clear_context_commands_string}`" | |
) | |
await message.channel.send(help_message) | |
# if user wants to clear chat history/context, | |
# initialize the conversation dict to wipe it out | |
elif author_message in clear_context_commands: | |
try: | |
print(f"current context: {conversation}") | |
conversation.pop(channel_id, None) # clear for curent chan | |
await message.channel.send("`context has been cleared`") | |
print(f"new context: {conversation}") | |
except Exception as e: | |
print( | |
f"User tried to clear context but " | |
f"context doesn't exist. {e}" | |
) | |
elif author_message[0] == '.': | |
# print incoming commands and their origin | |
print( | |
f"{server_name} " | |
f"({server_id}) - {channel_name}: " | |
f"{message.author}: {message.content}" | |
) | |
# remove the '.' trigger from beginning of string | |
author_message = author_message[1:] | |
# if conversation history is empty for this channel: | |
if (channel_id not in conversation or | |
not conversation[channel_id]["user"]): | |
# send to llama api | |
answer = llamatize(author_message, author_name) | |
else: # if channel has a conversation history | |
# if chat history, send with channel ID so we know which | |
# dictionary key to retrieve chat history context from. | |
answer = llamatize(author_message, author_name, channel_id) | |
if answer['success'] == True: | |
# get the actual answer content of the response | |
answer = answer['content'] | |
# update the conversation log to maintain context of the | |
# overall conversation context. | |
if channel_id not in conversation: | |
conversation[channel_id] = { | |
"user": [], | |
"llama": [] | |
} | |
# Append user message to the "user" list in the conversation | |
conversation[channel_id]["user"].append(author_message) | |
# Limit the "user" list to N items | |
if len(conversation[channel_id]["user"]) > CONTEXT_LENGTH: | |
# Remove the first item from "llama" | |
conversation[channel_id]["llama"].pop(0) | |
# Remove the first item from "user" | |
conversation[channel_id]["user"].pop(0) | |
# Append the llama response to the list | |
conversation[channel_id]["llama"].append(answer) | |
print(conversation) | |
# if answer is >0 and less than the message limit | |
if (len(answer) > 0) and (len(answer) < 1800): | |
# remove any incomplete sentence | |
answer = remove_incomplete_sentence(answer) | |
# send the message | |
await message.channel.send(answer) | |
# check if over 1800k chars then truncate. | |
# msg limit is 2k but idk how much of that is meta data | |
# if any, so we'll just do this for now. | |
elif len(answer) > 1800: | |
answer = remove_incomplete_sentence(answer) | |
# split into chunks | |
answer_chunk = chunk_message(answer) | |
# send each separately | |
for answer in answer_chunk: | |
await message.channel.send(answer) | |
# avoid rate limiting (2 secs between messages) | |
await asyncio.sleep(2) | |
else: # if error from llama api | |
# if empty answer['content'] | |
await message.channel.send( | |
"The request was successful but the model " | |
"produced no usable content." | |
) | |
else: | |
await message.channel.send(f"There was an error. {answer}") | |
# delete first element of ['user'] list since answer wasn't | |
# successful cos it will unsync context: | |
conversation[channel_id]["user"].pop(0) | |
conversation = {} | |
bot.run( | |
'' | |
) # paste your key between quotes |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment