Created
May 7, 2023 21:45
-
-
Save vdparikh/b1b9c34df07216f9311e537d90ec0593 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Slack script to call ChatGPT and participate in conversations when mentioned, summarize long threads, and query a sqlite database containing an organizational knowledge-base |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Single file app to run a GPT slackbot | |
Includes basic Text2SQL capabilities | |
1-) create application on slack | |
2-) enable socket mode | |
3-) create OAuth tokens | |
4-) add slash commands for /query and /summarize | |
5-) figure out your bot user ID | |
6-) enable the right scopes: | |
Bot Token Scopes | User Token Scopes | |
============================================= | |
app_mentions:read | channels:history | |
channels:history | chat:write | |
channels:read | groups:history | |
chat:write | im:history | |
commands | im:read | |
groups:history | im:write | |
groups:read | mpim:history | |
im:history | mpim:read | |
mpim:history | mpim:write | |
users:read | | |
7-) run from terminal, cron, or as a service | |
""" | |
from __future__ import print_function | |
# Python built-in modules | |
import os | |
import re | |
import time | |
import socket | |
import sqlite3 | |
# Third-party modules | |
import openai | |
import tiktoken | |
from tabulate import tabulate | |
from slack_bolt import App | |
from slack_bolt.adapter.socket_mode import SocketModeHandler | |
from slack_sdk import WebClient | |
from slack_sdk.errors import SlackApiError | |
############################# | |
# load local only credentials | |
############################## | |
# best to add credentials to .env file | |
# on same folder and keep it out of git | |
# you can use pip import dotenv | |
SLACK_BOT_TOKEN = "xoxb-..." | |
SLACK_APP_TOKEN = "xapp-..." | |
SLACK_BOT_USER = "U..." | |
OPENAI_API_KEY = "sk-..." | |
OPENAI_ORG = "org-..." | |
OPENAI_MODEL = "gpt-3.5-turbo" #or "gpt-4" | |
TOKEN_LIMIT = 8000 if OPENAI_MODEL == "gpt-4" else 4000 | |
# other params | |
DATA_DB = '...' #sqlite file name _.db | |
DATA_TABLE = '...' #table names | |
######################################### | |
# check there is internet connection or | |
# program fails when connecting to Slack | |
######################################### | |
def is_connected(): | |
""" | |
Check if there is an internet connection. | |
Returns True if connected, False otherwise. | |
""" | |
try: | |
conn = socket.create_connection(("www.google.com", 80)) | |
if conn is not None: | |
conn.close() | |
return True | |
except OSError: | |
pass | |
return False | |
while not is_connected(): | |
time.sleep(5) | |
print("is connected!") | |
######################## | |
# initialize APIs | |
######################## | |
app = App(token=SLACK_BOT_TOKEN) | |
web_client = WebClient(token=SLACK_BOT_TOKEN) | |
openai.organization = OPENAI_ORG | |
openai.api_key = OPENAI_API_KEY | |
encoding = tiktoken.encoding_for_model(OPENAI_MODEL) | |
######################## | |
# Slack event listeners | |
######################## | |
@app.event("app_mention") | |
def mention_handler(body, say): | |
""" | |
Gets called when the bot is mentioned | |
Message is passed to GPT, with context | |
""" | |
channel_id = body["event"]["channel"] | |
channel_name = get_channel_name(channel_id) | |
history = get_message_history(channel_id) | |
response = chat_gpt(history, channel_name) | |
say({"text": response, "response_type": "in_channel"}) | |
# pylint: disable=unused-argument | |
@app.command("/summarize") | |
def call_gpt_summarize(ack, say, command): | |
""" | |
Summarizes conversation in existing channel, in thread | |
""" | |
ack() | |
channel = command["channel_id"] | |
prompt = f"<@{command['user_id']}> asked to summarize conversation" | |
m_ts = web_client.chat_postMessage(channel=channel, text=prompt)["ts"] | |
history = get_message_history(command["channel_id"]) | |
text = summarize_conversation(history) | |
web_client.chat_postMessage(channel=channel, text=text, thread_ts=m_ts) | |
@app.command("/query") | |
def data_query_command(ack, say, command): | |
""" | |
Sends natural language question to DB | |
""" | |
ack() | |
question = command["text"].lower() | |
query = text2sql(command["text"]) | |
if query: | |
# post user question | |
prompt = f"<@{command['user_id']}> asked: {question}" | |
say({"text": prompt, "response_type": "in_channel"}) | |
# post query | |
say({"text": f"```\n{query}\n```\n", "response_type": "in_channel"}) | |
data = fetch(query) | |
# display raw data as table | |
split_markdown(data, say) | |
# pylint: disable=unused-argument | |
@app.event("message") | |
def handle_message_events(body, logger): | |
""" | |
catch-all for other events or it throws errors | |
""" | |
############################# | |
# Slack helper functions | |
############################# | |
def get_slack_users(): | |
""" | |
gets the list of users so they can be referred by name. | |
otherwise, all we know are their IDs | |
""" | |
result = web_client.users_list() | |
result = {usr["id"]: usr["name"] for usr in result["members"]} | |
return result | |
def get_channel_name(channel_id): | |
""" | |
get channel name | |
""" | |
# Get the channel information using the conversations.info API method | |
response = web_client.conversations_info(channel=channel_id) | |
# Extract the channel name from the response | |
return response["channel"]["name"] | |
USERS = get_slack_users() | |
def get_message_history(channel_id): | |
""" | |
gets message history, including threads, and removes unnecessary components | |
""" | |
messages = get_channel_messages(channel_id) | |
history = get_threads(channel_id, messages) | |
history = [ | |
{"text": replace_user_id(conv["text"]), "user": USERS[conv["user"]]} | |
for conv in history | |
] | |
history = list(reversed(history)) | |
return history | |
def get_channel_messages(channel_id): | |
""" | |
retrieves messages from a channel | |
""" | |
try: | |
result = web_client.conversations_history(channel=channel_id) | |
messages = result["messages"] | |
return messages | |
except SlackApiError as error: | |
print(f"Error: {error}") | |
return [] | |
def get_threads(channel_id, messages): | |
""" | |
retrieves threads for messages that contain them | |
flattens the whole structure into a single thread | |
""" | |
all_messages = [] | |
for message in messages: | |
all_messages.append(message) | |
if "thread_ts" in message: | |
try: | |
result = web_client.conversations_replies( | |
channel=channel_id, ts=message["ts"] | |
) | |
# for some reason threads are ordered opposite to messages | |
thread_messages = list(reversed(result["messages"])) | |
all_messages.extend(thread_messages) | |
except SlackApiError as error: | |
print(f"Error: {error}") | |
return all_messages | |
def split_markdown(code, say): | |
""" | |
slack is bad at printing large comment blocks so I need this work around | |
this will split blocks of text intended as markdown into multiple blocks | |
""" | |
if isinstance(code, str): | |
lines = code.splitlines() | |
sections = ["\n".join(lines[i : i + 10]) for i in range(0, len(lines), 10)] | |
for section in sections: | |
table = "```\n" + section + "\n```" | |
say({"text": table, "response_type": "in_channel"}) | |
def replace_user_id(message): | |
""" | |
Function to replace user IDs with user names | |
Also remove duplicate mentions to users | |
""" | |
def helper(match): | |
""" | |
Function to replace user IDs with user names | |
""" | |
user_id = match.group(1) | |
return USERS.get(user_id, f"<@{user_id}>") | |
pattern = r"<@([A-Z0-9]+)>" | |
# remove name: at the start of the message | |
message = re.sub(r"^\w+:\s*", "", message) | |
# replace mention/ids with names | |
return re.sub(pattern, helper, message) | |
def get_root_dir(): | |
""" | |
Returns root dir for the main file, in this case app.py | |
""" | |
# pylint: disable=no-member | |
main_script_path = os.path.abspath(__file__) | |
main_script_dir = os.path.dirname(os.path.realpath(main_script_path)) | |
return main_script_dir | |
############################# | |
# SQLite functions | |
############################# | |
def text2sql(text): | |
""" | |
takes a text prompt and returns sql | |
""" | |
schema = get_json_schema(DATA_TABLE) | |
rows, cols = data_query(f"SELECT * FROM {DATA_TABLE};") | |
markdown_table = get_markdown_table(rows, cols, pretty=False) | |
query = call_gpt( | |
get_prompt_template( | |
"query", | |
params={ | |
"table": DATA_TABLE, | |
"schema": schema, | |
"data": markdown_table, | |
"question": text, | |
}, | |
) | |
).lower() | |
# remove code blocks which appear ocassionally | |
query = query.replace("sql", "") | |
query = query.replace("```", "") | |
return query.strip() | |
def fetch(query): | |
""" | |
takes a query and returns data and explanation | |
call_gpt( | |
get_prompt_template( | |
"explain_query", | |
params={"question": question, "query": query, "response": rows}, | |
) | |
) | |
""" | |
rows, cols = data_query(query) | |
data = get_markdown_table(rows, cols) | |
return data | |
def get_markdown_table(rows, cols, pretty=True): | |
""" | |
returns markdown table for one or more items | |
""" | |
# make headers multi-line | |
if pretty: | |
cols = [col.replace("_", "\n") for col in cols] | |
return tabulate(rows, cols, floatfmt=".2f", intfmt=",") | |
def data_query(query): | |
""" | |
runs the actual query | |
""" | |
rows = [] | |
cols = [] | |
if query: | |
conn, cursor = get_db_conn() | |
cursor.execute(query) | |
cols = [col[0] for col in cursor.description] | |
rows = cursor.fetchall() | |
conn.close() | |
return rows, cols | |
def get_json_schema(table): | |
""" | |
get's the schema for the table | |
""" | |
# Retrieve the schema of the table and convert it to a JSON object | |
rows, _ = data_query(f"PRAGMA table_info({table})") | |
schema = [f'column: "{row[1]}", type: "{row[2]}"' for row in rows] | |
schema = "\n".join(schema) | |
return schema | |
def get_db_conn(mode="ro", db_path=None): | |
""" | |
Parameters: | |
mode (string): default 'ro' (read only), alternatively 'rw' (read write). | |
db_path (path): path where the db is located, alternatively it gets resolved from .env | |
Returns: | |
conn (sqlite connection): | |
cursor (sqlite cursor): | |
""" | |
if db_path is None: | |
db_path = os.path.join(get_root_dir(), DATA_DB) | |
# Connect to the database, read only | |
conn = sqlite3.connect(f"file:{db_path}?mode={mode}", uri=True) | |
# Create a cursor object | |
cursor = conn.cursor() | |
return conn, cursor | |
############################# | |
# GPT functions | |
############################# | |
def trim_list_by_tokens(str_list, prompt_header): | |
""" | |
Takes a list and trims it based on the number of tokens allowed | |
This approach is useful to avoid cutting messages in half | |
Removes the token legth of the prompt header | |
""" | |
count = 0 | |
ret = [] | |
if isinstance(str_list, list): | |
prompt_token_len = len(encoding.encode(prompt_header)) + 25 # add a buffer | |
# discount tokens for separator | |
prompt_token_len += len(encoding.encode("\n\n###\n\n")) * len(str_list) | |
# we reverse it so we only end up cropping the oldest messages | |
for item in reversed(str_list): | |
tokens = encoding.encode(item) | |
count += len(tokens) | |
if count > TOKEN_LIMIT - prompt_token_len: | |
break | |
ret.append(item) | |
# we reverse the list again so it's in chronological order | |
ret = list(reversed(ret)) | |
ret = "\n\n###\n\n".join(ret) | |
return ret | |
def summarize_conversation(history): | |
""" | |
uses GPT to summarize a conversation between various participants | |
""" | |
if len(history) > 0: | |
history = [f'{conv["user"]}: {conv["text"]}' for conv in history] | |
history = trim_list_by_tokens(history, get_prompt_template("summarize")) | |
return call_gpt(get_prompt_template("summarize", {"history": history})) | |
return None | |
def chat_gpt(history, channel): | |
""" | |
calls GPT with context, so it behaves like chat | |
""" | |
history = [f'{conv["user"]}: {conv["text"]}' for conv in history] | |
history = trim_list_by_tokens(history, get_prompt_template("bot_mention")) | |
prompt = get_prompt_template( | |
"bot_mention", {"history": history, "channel": channel} | |
) | |
response = call_gpt(prompt) | |
return response | |
def call_gpt(prompt, model=OPENAI_MODEL): | |
""" | |
calls GPT | |
""" | |
response = openai.ChatCompletion.create( | |
model=model, | |
messages=[ | |
{"role": "system", "content": get_prompt_template("system")}, | |
{"role": "user", "content": prompt}, | |
], | |
) | |
return response.choices[0].message.content | |
def get_prompt_template(template, params=None): | |
""" | |
Params need to be a dict with keys that match template | |
""" | |
content = PROMPTS[template] | |
if isinstance(params, dict): | |
content = content.format(**params) | |
return content | |
PROMPTS = { | |
"system": """You are a helpful assistant named scopy that works for a venture capital company named ScOp VC.""", | |
"bot_mention": """You are a helpful assistant that is part of a venture capital firm, named ScOp. Your name is scopy. | |
Below you will see a conversation in a particular slack channel among several members of this veture | |
capital firm named ScOp. The next time someone asks a question, please consider the existing conversation | |
context and provide a helpful answer. Sometimes, the answer will be unrelated to the conversation, | |
in which case you should come up with your own. The name of the channel is {channel}. | |
The channel name might be related to one of the companies in ScOp's portfolio. Always be nice and respecful, | |
but if the person asking the question is named Kevin, answer accurately but in a funny tone, as if you are | |
exasperated because Kevin is always asking silly questions. The last paragraph is the user's most recent | |
question, which is intended for you scopy. \n\n {history} \n\n###\n\n """, | |
"query": """Please regard the following table, named "{table}" with the following schema: \n###\n ```\n {schema} \n``` \n###\n | |
Here's what the data looks like: \n###\n ```\n {data} \n``` \n###\n | |
Write a SQL query, without any explanation of any kind, just the SQL query, | |
to answer the following question: {question}""", | |
"summarize": """Please write a thorough summary the following conversation. There are many participants. | |
Each time a participant speaks, a paragraph with start with the person's name. | |
The conversation follows below: \n\n###\n\n {history}""" | |
} | |
if __name__ == "__main__": | |
handler = SocketModeHandler(app, SLACK_APP_TOKEN) | |
handler.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment