Skip to content

Instantly share code, notes, and snippets.

@Happy-Ferret
Forked from kaecy/TelegramBotFramework.py
Created February 12, 2022 18:12
Show Gist options
  • Save Happy-Ferret/828a46538b1bae3afee8105d4ca55d67 to your computer and use it in GitHub Desktop.
Save Happy-Ferret/828a46538b1bae3afee8105d4ca55d67 to your computer and use it in GitHub Desktop.
Simple Telegram Bot API for Python. (requires python 3)

TelegramBotFramework

You can find a basic bot here.

message contains:
    str:
        A string of text received from a message.
chat contains:
    Chat object
        The chat object connected to the message.
extras contains:
    map:
        chatId:int
            The id of the chat the message is connected to.
        messageId:int
            The id of the message received.
        private:bool
            Is the message sent privately.
        sender:str
            The one who sent the message.
        senderId:int
            The id of the sender.
        stickerMsg:bool
            Is message a sticker message.
        replyMsg:bool
            Is the message a reply message.
        replyingToMsgId:int
            The id of the message being replied to.
        replyingToMsgUserObj:object
            The User object of the user of the message being replied to.
# jnet - library for requesting json
# Version: 0.0.4
import json
import socket
import urllib.error
import urllib.parse
import urllib.request
JSONType = "application/json"
# Request JSON from a web resource.
# Only returning an object if there is application/json data.
def requestJson(uri, params=None, extras={}):
res = None
reqUri = uri
if params:
reqUri += "?" + urllib.parse.urlencode(params)
method = "GET"
if "method" in extras:
method = extras['method']
headers = {"user-agent": "alpha/jnet"}
if "headers" in extras:
headers.update(extras['headers'])
try:
if "payload" in extras:
req = urllib.request.Request(reqUri, method=method, data=extras['payload'], headers=headers)
res = urllib.request.urlopen(req, None, 60)
else:
req = urllib.request.Request(reqUri, method=method, headers=headers)
res = urllib.request.urlopen(req, None, 60)
except socket.timeout:
print("JNet: Timeout.")
return None
except urllib.error.HTTPError as exception:
res = exception
content = res.read()
contentType = res.getheader("content-type", "")
res = None
if contentType.startswith(JSONType):
res = json.loads(content)
return res
# Multipart form library
# Created 08 Nov, 2020
# v0.0.1
import os
import io
import sys
import socket, ssl
import urllib.parse
import mimetypes
import enum
class FormContentType(enum.IntEnum):
Void = 0
Text = 1
File = 2
class FormContent:
def __init__(self, name, type=FormContentType.Void, data=None):
self.name = name
self.type = type
self.size = 0
self.headers = [
'Content-Disposition: form-data; name=\"%s\"' %(name)
]
if type == FormContentType.Text:
self.data = data
self.size = len(data)
elif type == FormContentType.File:
basename = os.path.basename(data)
self.data = data
self.size = os.path.getsize(data)
self.headers = [
'Content-Disposition: form-data; name=\"%s\"; filename=\"%s\"' %(name, basename)
]
self.headers.append('Content-Type: %s' %(mimetypes.guess_type(data)[0]) or "application/octet-stream")
class Form:
def __init__(self):
self.boundary = "666-devils-666"
self.headers = []
self.contents = [] # array of FormContent
self.calculateSize()
def calculateSize(self):
size = 0
boundaryLength = len(self.boundary)
for content in self.contents:
# add together the header info, boundaries and line endings lengths
size += boundaryLength + 4
for header in content.headers:
size += len(header) + 2
# terminating header chars
size += 2
size += content.size
size += 2
size += boundaryLength + 4
self.headers = [
"Content-Type: multipart/form-data; boundary=" + self.boundary,
"Content-Length: " + str(size)
]
def addText(self, name, text):
self.contents.append(FormContent(name, FormContentType.Text, text))
self.calculateSize()
def addFile(self, name, file):
self.contents.append(FormContent(name, FormContentType.File, file))
self.calculateSize()
class SocketFile(io.FileIO):
def send(self, data):
self.write(data)
def recv(self, i):
return "Nothing returned".encode()
class BasicRequest:
def post(self, url, form, service=None):
if service == None:
url = urllib.parse.urlparse(url)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
service = ssl_context.wrap_socket(sock, server_hostname=url.netloc)
service.connect((url.netloc, 443))
service.send(("POST %s HTTP/1.0\r\n" %(url.path)).encode())
service.send(("Host: %s\r\n" %(url.netloc)).encode())
print("Writing http request headers")
self.writeHeaders(form.headers, service)
service.send("\r\n".encode())
print("Writing body")
self.writeContent(form, service)
print("Waiting for response")
result = self.parseResponse(service)
service.close()
return result
def writeHeaders(self, headers, service):
for header in headers:
service.send(header.encode())
service.send("\r\n".encode())
def writeContent(self, form, service):
for content in form.contents:
service.send("--".encode())
service.send(form.boundary.encode())
service.send("\r\n".encode())
self.writeHeaders(content.headers, service)
service.send("\r\n".encode())
if content.type == FormContentType.Text:
service.send(content.data.encode())
elif content.type == FormContentType.File:
print("writing file")
f = open(content.data, "rb")
while data := f.read(4096):
service.send(data)
f.close()
service.send("\r\n".encode())
service.send("--".encode())
service.send(form.boundary.encode())
service.send("--".encode())
def parseResponse(self, service):
status, headers, content = 0, {}, ""
byteBlock = b""
byteBuffer = b""
while byteBuffer := service.recv(512):
byteBlock += byteBuffer
headerBlock, contentBlock = byteBlock.split(b"\r\n\r\n")
status, *headerLines = headerBlock.split(b"\r\n")
status = status.split(b" ")[1].decode()
for line in headerLines:
key, content = line.split(b": ")
headers[key.decode()] = content.decode()
content = contentBlock.decode()
return {
"status": status,
"headers": headers,
"content": content
}
# Version: 0.3.0
# Rewrite: Incomplete
import re
import mpf # post file support
import utils
import enum
import jnet
import json
Plaintext = ""
Markdown = "markdown"
class Infrastructure:
def __init__(self, key):
self.botKey = key
self.baseUri = "https://api.telegram.org/bot" + key
def getMe(self):
data = jnet.requestJson(self.baseUri+ "/getMe")
if data and data['ok']: return User(data["result"])
def getUpdates(self, params=None):
data = jnet.requestJson(self.baseUri+ "/getUpdates", params)
updates = []
if data and data['ok']:
for mapObj in data['result']:
updates.append(Update(mapObj))
return updates
def getChat(self, id):
return Chat({'chat_id': id})
def setWebHook(self, uri):
data = jnet.requestJson(self.baseUri+ "/setWebhook", {"url": uri})
if data and data['ok']:
if data['result'] == True: print("Webhook set.")
def deleteWebHook(self):
data = jnet.requestJson(self.baseUri+ "/deleteWebhook")
if data and data['ok']:
if data['result'] == True: print("Webhook deleted.")
def getFileUri(self, fileId):
data = jnet.requestJson(self.botQueryUri+ "/getFile", {"file_id": fileId})
if data:
if data['ok']:
return "https://api.telegram.org/file/bot" + self.botKey + "/" + data['result']['file_path']
else:
print(data['description'])
def getStickerSet(self, name):
data = jnet.requestJson(self.botQueryUri+ "/getStickerSet", {"name": name})
if data:
if data['ok']:
return StickerSet(data['result'])
else:
print(data['description'])
class Optional:
def __init__(self, data):
for field, value in data.items():
self.set(field, value)
def __getattribute__(self, field):
try:
return object.__getattribute__(self, field)
except AttributeError:
return None
def set(self, field, value):
setattr(self, field, value)
class Error(Optional):
def __init__(self):
Optional.__init__(self)
class Extras(Optional):
def __init__(self):
Optional.__init__(self)
class Update(Optional):
def __init__(self, data):
Optional.__init__(self, data)
if self.message:
self.message = Message(self.message)
if self.edited_message:
self.edited_message = Message(self.edited_message)
class User(Optional):
def __init__(self, data):
Optional.__init__(self, data)
class Message(Optional):
def __init__(self, data):
Optional.__init__(self, data)
if hasattr(self, "from"):
self.sender = User(getattr(self, "from"))
if self.chat:
self.chat = Chat(self.chat)
if self.reply_to_message:
self.reply_to_message = Message(self.reply_to_message)
if self.new_chat_members:
newMembers = []
for user in self.new_chat_members:
newMembers.append(User(user))
self.new_chat_members = newMembers
if self.left_chat_member:
self.left_chat_member = User(self.left_chat_member)
class Chat(Optional):
infra = None
@staticmethod
def setInfra(infra):
Chat.infra = infra
def __init__(self, data):
if self.infra == None:
raise "No infrastrcture set."
if type(data) == dict:
Optional.__init__(self, data)
if type(data) == int:
self.chat_id = data
def getMember(self, userId):
data = jnet.requestJson(self.infra.baseUri+ "/getChatMember",
{"chat_id": self.chat_id, "user_id": userId})
if data:
if data['ok']:
return ChatMember(data['result'])
else:
self.sendMessage(data['description'])
def isAdmin(self, userId):
user = self.getMember(userId)
print("User:", user)
if user.status in ["administrator", "creator"]:
return True
def sendMessage(self, message, parseMode=Plaintext):
data = jnet.requestJson(self.infra.baseUri+ "/sendMessage", {'chat_id': self.chat_id, 'text': message, "parse_mode": parseMode})
if data:
if data['ok']:
return Message(data['result'])
else:
print("Error description:", data['description'])
self.sendMessage(data['description'])
def sendSticker(self, stickerId):
res = jnet.requestJson(self.infra.baseUri+ "/sendSticker", {'chat_id': self.chat_id, 'sticker': stickerId})
data = res.getJson()
if data:
if res.isError():
self.sendMessage(data['description'])
else:
return data['result']
# A status of true means good. Check reason if false.
def sendMedia(self, file, to=None, caption=None, type="Document"):
to = to or str(self.chat_id)
form = mpf.Form()
form.addText(type.lower(), "attach://upload_file")
form.addFile("upload_file", file)
form.addText("chat_id", to)
if caption:
form.addText("caption", caption)
FormRequest = mpf.BasicRequest()
result = FormRequest.post(self.infra.baseUri + "/send" + type, form)
if result['headers']['Content-Type'] == "application/json":
data = json.loads(result['content'])
if data['ok']:
result = data['result']
response = {"status": data['ok']}
if response['status'] == False:
response['reason'] = data['description']
elif result['chat']['type'] == "supergroup":
response['link'] = "http://t.me/" + str(result['chat']['id']) + "/" + str(result['message_id'])
return response
else:
self.sendMessage(data['description'])
def deleteMessage(self, messageId):
data = jnet.requestJson(self.infra.baseUri+ "/deleteMessage", {'chat_id': self.chat_id, 'message_id': messageId})
if data:
if "ok" in data and data['ok']:
return data['result']
else:
self.sendMessage(data['description'])
# Send different type of media.
def sendAudio(self, file, to=None, caption=None):
self.sendMedia(file, to, caption, "Audio")
def sendVideo(self, file, to=None, caption=None):
self.sendMedia(file, to, caption, "Video")
def sendPhoto(self, file, to=None, caption=None):
self.sendMedia(file, to, caption, "Photo")
def restrictMember(self, userId, permissions):
data = jnet.requestJson(self.infra.baseUri+ "/restrictChatMember", {"chat_id": self.chat_id, "user_id": userId, "permissions": permissions})
if data['ok']:
return data['result']
else:
self.sendMessage(data['description'])
# Returns True on success.
def pin(self, messageId, disableNotify=False):
# Returns 'Bad Request: CHAT_NOT_MODIFIED' if same message is pinned.
res = jnet.requestJson(self.infra.baseUri+ "/pinChatMessage", {'chat_id': self.chat_id, 'message_id': messageId, 'disable_notification': disableNotify})
if res:
if data := res.good():
return data['result']
elif data := res.bad():
self.sendMessage(data['description'])
# Returns True on success.
def unpin(self, to):
# Returns 'Bad Request: CHAT_NOT_MODIFIED' if no message is pinned.
res = jnet.requestJson(self.infra.baseUri+ "/unpinChatMessage", {'chat_id': self.chat_id})
data = res.getJson()
if data:
if res.isError():
print(data['result']['description'])
else:
return data['result']
def getAdminsIds(self):
res = jnet.requestJson(self.infra.baseUri+ "/getChatAdministrators", {"chat_id": self.chat_id})
data = res.getJson()
if data:
if res.isError():
print(data['description'])
else:
admins = []
for member in data['result']:
admins.append(member['user']['id'])
return admins
class ChatMember(Optional):
def __init__(self, data):
Optional.__init__(self, data)
self.user = User(self.user)
class ChatPermissions:
def __init__(self, level=None):
# all members have messages, poll, and add members permissions.
if level == None:
self.restrict()
elif level == "basic":
self.unrestrict()
elif level == "media":
self.allow_media()
def __str__(self):
return json.dumps(self.data)
def unrestrict(self):
self.data = {
"can_send_messages": True,
"can_send_media": False,
"can_send_polls": True,
"can_send_other_messages": False,
"can_add_web_page_previews": False,
"can_change_info": False,
"can_invite_users": True,
"can_pin_messages": False
}
def allow_media(self):
self.data = {
# basic
"can_send_messages": True,
"can_send_polls": True,
"can_invite_users": True,
# media
"can_send_media": True,
"can_send_other_messages": True,
"can_add_web_page_previews": True
}
def restrict(self):
self.data = {
"can_send_messages": False
}
# Docs: https://core.telegram.org/bots/api#sticker
class Sticker(Optional):
def __init__(self, data):
Optional.__init__(self, data)
# Docs: https://core.telegram.org/bots/api#stickerset
class StickerSet(Optional):
def __init__(self, data):
Optional.__init__(self, data)
if self.stickers:
stickers = []
for stickerMap in self.stickers:
stickers.append(Sticker(stickerMap))
self.stickers = stickers
class InlineQueryResultGifs:
def __init__(self):
self.list = []
self.id = 0
def add(self, url, thumb_url):
self.list.append({
"type": "gif",
"id": str(self.id),
"gif_url": url,
"thumb_url": thumb_url,
"thumb_mime_type": "image/gif"
})
self.id += 1
class Bot(Infrastructure):
def __init__(self, botKey):
Infrastructure.__init__(self, botKey)
self.commands = []
self.helpString = ""
self.botQueryUri = "https://api.telegram.org/bot"+botKey
self.inlineQueryMethod = None
# bit switches
self.logUsers = False
self.botData = self.getMe()
self.on("help", self.help)
self.on("online", self.online)
# Set the infrastructure for base classes.
Chat.setInfra(self)
# The heart of the framework.
def start(self, globals=None):
print(self.botData)
print("Started %s." % self.botData.username)
# Automagically assigns callbacks.
if globals:
self.setCallbacks(globals)
updates = []
update_id = -1
while True:
updates = self.getUpdates({
'allowed_updates': 'message',
'timeout': 15,
'offset': update_id+1
})
update_id = self.processUpdates(updates)
def processUpdates(self, updates):
update_id = -1
for update in updates:
# For webhooks, convert dicts to Updates
if type(update) == dict:
update = Update(update)
if update.message or update.edited_message:
message = update.message or update.edited_message
text, extras = self.processMessage(message)
self.run(text, extras)
if update.inline_query:
self.processInlineQuery(update.inline_query)
update_id = update.update_id
return update_id
def processMessage(self, msg):
message = ""
extras = {}
extras = {
"messageId": msg.message_id,
"chatId": msg.chat.id,
"sender": msg.sender.first_name,
"senderId": msg.sender.id,
"username": msg.sender.username
}
if msg.text:
message = msg.text
if message.startswith("/"):
if message.find(" ") != -1:
commandStr, message = message.split(" ", 1)
commandStr = commandStr
else:
# if there is no space it's a simple command and in
# this case it should be moved to commandStr
commandStr = message
message = ""
if commandStr.find("@") != -1:
commandStr, extras['commandBot'] = commandStr.split("@", 1)
extras['commandStr'] = commandStr[1:] # slice off the slash
extras['private'] = True if msg.chat.type == "private" \
else False
if msg.entities:
entities = msg.entities
mentions = []
for entity in entities:
if entity['type'] == "text_mention":
mentions.append(entity['user']['id'])
elif entity['type'] == "mention":
offset = entity['offset']
length = entity['length']
mentions.append(msg.text[offset:offset+length])
if mentions != []:
extras['mentions'] = mentions
extras["replyMsg"] = False
if msg.reply_to_message:
reply = msg.reply_to_message
extras["replyMsg"] = True
extras.update({
"replyingToMsgUserObj": reply.sender,
"replyingToMsgId": reply.message_id
})
if reply.text:
extras["replyingToMsgText"] = reply.text
extras['bots'] = []
if msg.new_chat_members:
for user in msg.new_chat_members:
if user.is_bot:
extras['bots'].append({
"user": user.first_name,
"id": user.id
})
for user in msg.new_chat_members:
print("Joined " + user.first_name)
if msg.left_chat_member:
print("Leaving " + msg.left_chat_member.first_name)
extras['stickerMsg'] = False
if msg.sticker:
extras.update({
"sitckerMsg": True,
'stickerSet': msg.sticker['set_name'],
'stickerEmoji': msg.sticker['emoji']
})
if "commandStr" in extras or self.logUsers:
print("Extras:", extras)
print(extras['sender'], repr(message))
return [message, extras]
def processInlineQuery(self, query):
if self.inlineQueryMethod:
self.inlineQueryMethod(query['id'], query['query'])
def setCallbacks(self, globals):
if globals:
privmethods = utils.getPrivateMethods(globals)
for method in privmethods:
print(method)
self.on(method.replace("__cmd__", ""), privmethods[method])
def run(self, message, extras):
chat = self.getChat(extras['chatId'])
for triggerData in self.commands:
pattern = triggerData[0]
command = triggerData[1]
if "commandStr" in extras:
if pattern == extras['commandStr']:
command(message, chat, extras)
return
if "__msg__" in self.commands:
print("Running message handler")
self.commands["__msg__"](text, chat, extras)
def on(self, pattern, method):
self.commands.append([pattern, method])
def addHelpString(self, text):
self.helpString += text + "\n"
def addUnfilteredMsgHandler(self, method):
self.unfilteredMsgHandlerMethod = method
def addInlineQueryMethod(self, method):
self.inlineQueryMethod = method
# self event handlers
def help(self, message, chat, extras):
if self.helpString == "":
chat.sendMessage("No help set")
else:
chat.sendMessage(self.helpString)
def online(self, message, chat, extras):
chat.sendMessage("I'm online!")
# end
def getMentionText(self, sender, userId):
return "[" + sender + "](tg://user?id=" + str(userId) + ")"
def answerInlineQuery(self, queryId, answer):
res = jnet.requestJson(self.botQueryUri+ "/answerInlineQuery", {'inline_query_id': queryId, 'results': json.dumps(answer.list)})
if not res.isError():
return True
# Utils
# Version: 0.0.2
# Get global private __cmd__ and __msg__ methods as a map.
def getPrivateMethods(glbs):
keys = list(glbs.keys())
gbMethods = {}
for method in keys:
if (method.startswith("__cmd__") or method.startswith("__msg__")) and callable(glbs[method]):
gbMethods[method] = glbs[method]
return gbMethods
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment