Skip to content

Instantly share code, notes, and snippets.

@hargup
Created June 7, 2024 03:18
Show Gist options
  • Save hargup/edae536e7803aaf829823527ef7bb645 to your computer and use it in GitHub Desktop.
Save hargup/edae536e7803aaf829823527ef7bb645 to your computer and use it in GitHub Desktop.
# A basic telegram bot which can respond to messages and voice notes
# How to create a telegram bot
#
# To get the bot token, follow these steps:
# 1. Open the Telegram app and search for @BotFather.
# 2. Start a chat with BotFather and click on /newbot.
# 3. Follow the instructions given by BotFather to create a new bot.
# 4. After successfully creating the bot, BotFather will give you a token. This is your bot token.
# 5. Replace '<replace with yours>' in the TELE_BOT_TOKEN constant with your bot token.
#
# That's that, you get the token, run this file and your bot is now working. The bot works by infinite polling
# Only one bot can run at a tiem
import telebot
import os
from dotenv import load_dotenv
import requests
import json
import time
from deepgram import (
DeepgramClientOptions,
DeepgramClient,
PrerecordedOptions,
FileSource,
)
# from fastapi import FastAPI
import uvicorn
import threading
from openai import OpenAI
OpenAIclient = OpenAI(
# This is the default and can be omitted
api_key=os.environ.get("OPENAI_API_KEY"),
)
# Uptime monitoring on this is done using https://healthchecks.io
load_dotenv('.env.local')
# Constants
TELE_BOT_TOKEN = '<replace with yours>'
DEEPGRAM_API_KEY = os.getenv("DEEPGRAM_API_KEY")
# app = FastAPI()
bot = telebot.TeleBot(TELE_BOT_TOKEN, parse_mode=None) # You can set parse_mode by default. HTML or MARKDOWN
@bot.message_handler(commands=['ping'])
def send_welcome(message):
bot.reply_to(message, "pong")
@bot.message_handler(commands=['start', 'help'])
def send_welcome(message):
bot.reply_to(message, "Howdy, how are you doing?")
@bot.message_handler(func=lambda message: True)
def echo_all(message):
bot.reply_to(message, message.text + "?")
def voice_file_to_transcript(file_path):
try:
# STEP 1 Create a Deepgram client using the API key in the environment variables
config: DeepgramClientOptions = DeepgramClientOptions(
)
deepgram: DeepgramClient = DeepgramClient("", config)
# STEP 2 Call the transcribe_file method on the prerecorded class
with open(file_path, "rb") as file:
buffer_data = file.read()
payload: FileSource = {
"buffer": buffer_data,
}
options = PrerecordedOptions(
model="nova",
smart_format=True,
utterances=True,
punctuate=True,
diarize=True,
)
file_response = deepgram.listen.prerecorded.v("1").transcribe_file(payload, options)
result_json = json.loads(file_response.to_json())
return result_json['results']['channels'][0]['alternatives'][0]['transcript']
except Exception as e:
print(f"Exception: {e}")
@bot.message_handler(content_types=['voice', 'audio'])
def handle_voice_msg(message):
print("processing voice message")
file_info = bot.get_file(message.voice.file_id)
downloaded_file = bot.download_file(file_info.file_path)
author = message.from_user.first_name
print(f'author: {author}')
timestamp = str(int(time.time()))
voice_file_name = timestamp + '.ogg'
with open(voice_file_name, 'wb') as new_file:
new_file.write(downloaded_file)
transcript = voice_file_to_transcript(voice_file_name)
bot.reply_to(message, f"Transcript:\n{transcript}")
messages =[
{"role": "user", "content": f"""{transcript}"""},
{"role": "user", "content": f"""Above is a transcript of a voice note by '{author}', give the summary in bullets, if there are action items, create a seperate section for it.
Format:
Summary:
- <main point 1, 2-3 setences>
- <main point 1, 2-3 setences>
Action Items:
- <action item 1>
- <action itme 2>"""},
]
response = OpenAIclient.chat.completions.create(
model="gpt-4",
messages=messages
)
summary = response.choices[0].message.content
# Reply to the message with the transcript
bot.reply_to(message, summary)
if __name__ == "__main__":
# heartbeat_thread = threading.Thread(target=heartbeat)
# heartbeat_thread.start()
# Create a thread for bot polling
bot_thread = threading.Thread(target=bot.infinity_polling)
bot_thread.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment