You can find a basic bot here.
Last active
February 12, 2022 18:12
-
-
Save kaecy/3c1a6a6f2d39d9df907f6935d8b2ba40 to your computer and use it in GitHub Desktop.
Simple Telegram Bot API for Python. (requires python 3)
message contains:
str:
A string of text received from a message.
chat contains:
Chat object
The chat object connected to the message.
extras contains:
map:
chatId:int
The id of the chat the message is connected to.
messageId:int
The id of the message received.
private:bool
Is the message sent privately.
sender:str
The one who sent the message.
senderId:int
The id of the sender.
stickerMsg:bool
Is message a sticker message.
replyMsg:bool
Is the message a reply message.
replyingToMsgId:int
The id of the message being replied to.
replyingToMsgUserObj:object
The User object of the user of the message being replied to.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# jnet - library for requesting json | |
# Version: 0.0.4 | |
import json | |
import socket | |
import urllib.error | |
import urllib.parse | |
import urllib.request | |
JSONType = "application/json" | |
# Request JSON from a web resource. | |
# Only returning an object if there is application/json data. | |
def requestJson(uri, params=None, extras={}): | |
res = None | |
reqUri = uri | |
if params: | |
reqUri += "?" + urllib.parse.urlencode(params) | |
method = "GET" | |
if "method" in extras: | |
method = extras['method'] | |
headers = {"user-agent": "alpha/jnet"} | |
if "headers" in extras: | |
headers.update(extras['headers']) | |
try: | |
if "payload" in extras: | |
req = urllib.request.Request(reqUri, method=method, data=extras['payload'], headers=headers) | |
res = urllib.request.urlopen(req, None, 60) | |
else: | |
req = urllib.request.Request(reqUri, method=method, headers=headers) | |
res = urllib.request.urlopen(req, None, 60) | |
except socket.timeout: | |
print("JNet: Timeout.") | |
return None | |
except urllib.error.HTTPError as exception: | |
res = exception | |
content = res.read() | |
contentType = res.getheader("content-type", "") | |
res = None | |
if contentType.startswith(JSONType): | |
res = json.loads(content) | |
return res |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Multipart form library | |
# Created 08 Nov, 2020 | |
# v0.0.1 | |
import os | |
import io | |
import sys | |
import socket, ssl | |
import urllib.parse | |
import mimetypes | |
import enum | |
class FormContentType(enum.IntEnum): | |
Void = 0 | |
Text = 1 | |
File = 2 | |
class FormContent: | |
def __init__(self, name, type=FormContentType.Void, data=None): | |
self.name = name | |
self.type = type | |
self.size = 0 | |
self.headers = [ | |
'Content-Disposition: form-data; name=\"%s\"' %(name) | |
] | |
if type == FormContentType.Text: | |
self.data = data | |
self.size = len(data) | |
elif type == FormContentType.File: | |
basename = os.path.basename(data) | |
self.data = data | |
self.size = os.path.getsize(data) | |
self.headers = [ | |
'Content-Disposition: form-data; name=\"%s\"; filename=\"%s\"' %(name, basename) | |
] | |
self.headers.append('Content-Type: %s' %(mimetypes.guess_type(data)[0]) or "application/octet-stream") | |
class Form: | |
def __init__(self): | |
self.boundary = "666-devils-666" | |
self.headers = [] | |
self.contents = [] # array of FormContent | |
self.calculateSize() | |
def calculateSize(self): | |
size = 0 | |
boundaryLength = len(self.boundary) | |
for content in self.contents: | |
# add together the header info, boundaries and line endings lengths | |
size += boundaryLength + 4 | |
for header in content.headers: | |
size += len(header) + 2 | |
# terminating header chars | |
size += 2 | |
size += content.size | |
size += 2 | |
size += boundaryLength + 4 | |
self.headers = [ | |
"Content-Type: multipart/form-data; boundary=" + self.boundary, | |
"Content-Length: " + str(size) | |
] | |
def addText(self, name, text): | |
self.contents.append(FormContent(name, FormContentType.Text, text)) | |
self.calculateSize() | |
def addFile(self, name, file): | |
self.contents.append(FormContent(name, FormContentType.File, file)) | |
self.calculateSize() | |
class SocketFile(io.FileIO): | |
def send(self, data): | |
self.write(data) | |
def recv(self, i): | |
return "Nothing returned".encode() | |
class BasicRequest: | |
def post(self, url, form, service=None): | |
if service == None: | |
url = urllib.parse.urlparse(url) | |
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS) | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
service = ssl_context.wrap_socket(sock, server_hostname=url.netloc) | |
service.connect((url.netloc, 443)) | |
service.send(("POST %s HTTP/1.0\r\n" %(url.path)).encode()) | |
service.send(("Host: %s\r\n" %(url.netloc)).encode()) | |
print("Writing http request headers") | |
self.writeHeaders(form.headers, service) | |
service.send("\r\n".encode()) | |
print("Writing body") | |
self.writeContent(form, service) | |
print("Waiting for response") | |
result = self.parseResponse(service) | |
service.close() | |
return result | |
def writeHeaders(self, headers, service): | |
for header in headers: | |
service.send(header.encode()) | |
service.send("\r\n".encode()) | |
def writeContent(self, form, service): | |
for content in form.contents: | |
service.send("--".encode()) | |
service.send(form.boundary.encode()) | |
service.send("\r\n".encode()) | |
self.writeHeaders(content.headers, service) | |
service.send("\r\n".encode()) | |
if content.type == FormContentType.Text: | |
service.send(content.data.encode()) | |
elif content.type == FormContentType.File: | |
print("writing file") | |
f = open(content.data, "rb") | |
while data := f.read(4096): | |
service.send(data) | |
f.close() | |
service.send("\r\n".encode()) | |
service.send("--".encode()) | |
service.send(form.boundary.encode()) | |
service.send("--".encode()) | |
def parseResponse(self, service): | |
status, headers, content = 0, {}, "" | |
byteBlock = b"" | |
byteBuffer = b"" | |
while byteBuffer := service.recv(512): | |
byteBlock += byteBuffer | |
headerBlock, contentBlock = byteBlock.split(b"\r\n\r\n") | |
status, *headerLines = headerBlock.split(b"\r\n") | |
status = status.split(b" ")[1].decode() | |
for line in headerLines: | |
key, content = line.split(b": ") | |
headers[key.decode()] = content.decode() | |
content = contentBlock.decode() | |
return { | |
"status": status, | |
"headers": headers, | |
"content": content | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Version: 0.3.0 | |
# Rewrite: Incomplete | |
import re | |
import mpf # post file support | |
import utils | |
import enum | |
import jnet | |
import json | |
Plaintext = "" | |
Markdown = "markdown" | |
class Infrastructure: | |
def __init__(self, key): | |
self.botKey = key | |
self.baseUri = "https://api.telegram.org/bot" + key | |
def getMe(self): | |
data = jnet.requestJson(self.baseUri+ "/getMe") | |
if data and data['ok']: return User(data["result"]) | |
def getUpdates(self, params=None): | |
data = jnet.requestJson(self.baseUri+ "/getUpdates", params) | |
updates = [] | |
if data and data['ok']: | |
for mapObj in data['result']: | |
updates.append(Update(mapObj)) | |
return updates | |
def getChat(self, id): | |
return Chat({'chat_id': id}) | |
def setWebHook(self, uri): | |
data = jnet.requestJson(self.baseUri+ "/setWebhook", {"url": uri}) | |
if data and data['ok']: | |
if data['result'] == True: print("Webhook set.") | |
def deleteWebHook(self): | |
data = jnet.requestJson(self.baseUri+ "/deleteWebhook") | |
if data and data['ok']: | |
if data['result'] == True: print("Webhook deleted.") | |
def getFileUri(self, fileId): | |
data = jnet.requestJson(self.botQueryUri+ "/getFile", {"file_id": fileId}) | |
if data: | |
if data['ok']: | |
return "https://api.telegram.org/file/bot" + self.botKey + "/" + data['result']['file_path'] | |
else: | |
print(data['description']) | |
def getStickerSet(self, name): | |
data = jnet.requestJson(self.botQueryUri+ "/getStickerSet", {"name": name}) | |
if data: | |
if data['ok']: | |
return StickerSet(data['result']) | |
else: | |
print(data['description']) | |
class Optional: | |
def __init__(self, data): | |
for field, value in data.items(): | |
self.set(field, value) | |
def __getattribute__(self, field): | |
try: | |
return object.__getattribute__(self, field) | |
except AttributeError: | |
return None | |
def set(self, field, value): | |
setattr(self, field, value) | |
class Error(Optional): | |
def __init__(self): | |
Optional.__init__(self) | |
class Extras(Optional): | |
def __init__(self): | |
Optional.__init__(self) | |
class Update(Optional): | |
def __init__(self, data): | |
Optional.__init__(self, data) | |
if self.message: | |
self.message = Message(self.message) | |
if self.edited_message: | |
self.edited_message = Message(self.edited_message) | |
class User(Optional): | |
def __init__(self, data): | |
Optional.__init__(self, data) | |
class Message(Optional): | |
def __init__(self, data): | |
Optional.__init__(self, data) | |
if hasattr(self, "from"): | |
self.sender = User(getattr(self, "from")) | |
if self.chat: | |
self.chat = Chat(self.chat) | |
if self.reply_to_message: | |
self.reply_to_message = Message(self.reply_to_message) | |
if self.new_chat_members: | |
newMembers = [] | |
for user in self.new_chat_members: | |
newMembers.append(User(user)) | |
self.new_chat_members = newMembers | |
if self.left_chat_member: | |
self.left_chat_member = User(self.left_chat_member) | |
class Chat(Optional): | |
infra = None | |
@staticmethod | |
def setInfra(infra): | |
Chat.infra = infra | |
def __init__(self, data): | |
if self.infra == None: | |
raise "No infrastrcture set." | |
if type(data) == dict: | |
Optional.__init__(self, data) | |
if type(data) == int: | |
self.chat_id = data | |
def getMember(self, userId): | |
data = jnet.requestJson(self.infra.baseUri+ "/getChatMember", | |
{"chat_id": self.chat_id, "user_id": userId}) | |
if data: | |
if data['ok']: | |
return ChatMember(data['result']) | |
else: | |
self.sendMessage(data['description']) | |
def isAdmin(self, userId): | |
user = self.getMember(userId) | |
print("User:", user) | |
if user.status in ["administrator", "creator"]: | |
return True | |
def sendMessage(self, message, parseMode=Plaintext): | |
data = jnet.requestJson(self.infra.baseUri+ "/sendMessage", {'chat_id': self.chat_id, 'text': message, "parse_mode": parseMode}) | |
if data: | |
if data['ok']: | |
return Message(data['result']) | |
else: | |
print("Error description:", data['description']) | |
self.sendMessage(data['description']) | |
def sendSticker(self, stickerId): | |
res = jnet.requestJson(self.infra.baseUri+ "/sendSticker", {'chat_id': self.chat_id, 'sticker': stickerId}) | |
data = res.getJson() | |
if data: | |
if res.isError(): | |
self.sendMessage(data['description']) | |
else: | |
return data['result'] | |
# A status of true means good. Check reason if false. | |
def sendMedia(self, file, to=None, caption=None, type="Document"): | |
to = to or str(self.chat_id) | |
form = mpf.Form() | |
form.addText(type.lower(), "attach://upload_file") | |
form.addFile("upload_file", file) | |
form.addText("chat_id", to) | |
if caption: | |
form.addText("caption", caption) | |
FormRequest = mpf.BasicRequest() | |
result = FormRequest.post(self.infra.baseUri + "/send" + type, form) | |
if result['headers']['Content-Type'] == "application/json": | |
data = json.loads(result['content']) | |
if data['ok']: | |
result = data['result'] | |
response = {"status": data['ok']} | |
if response['status'] == False: | |
response['reason'] = data['description'] | |
elif result['chat']['type'] == "supergroup": | |
response['link'] = "http://t.me/" + str(result['chat']['id']) + "/" + str(result['message_id']) | |
return response | |
else: | |
self.sendMessage(data['description']) | |
def deleteMessage(self, messageId): | |
data = jnet.requestJson(self.infra.baseUri+ "/deleteMessage", {'chat_id': self.chat_id, 'message_id': messageId}) | |
if data: | |
if "ok" in data and data['ok']: | |
return data['result'] | |
else: | |
self.sendMessage(data['description']) | |
# Send different type of media. | |
def sendAudio(self, file, to=None, caption=None): | |
self.sendMedia(file, to, caption, "Audio") | |
def sendVideo(self, file, to=None, caption=None): | |
self.sendMedia(file, to, caption, "Video") | |
def sendPhoto(self, file, to=None, caption=None): | |
self.sendMedia(file, to, caption, "Photo") | |
def restrictMember(self, userId, permissions): | |
data = jnet.requestJson(self.infra.baseUri+ "/restrictChatMember", {"chat_id": self.chat_id, "user_id": userId, "permissions": permissions}) | |
if data['ok']: | |
return data['result'] | |
else: | |
self.sendMessage(data['description']) | |
# Returns True on success. | |
def pin(self, messageId, disableNotify=False): | |
# Returns 'Bad Request: CHAT_NOT_MODIFIED' if same message is pinned. | |
res = jnet.requestJson(self.infra.baseUri+ "/pinChatMessage", {'chat_id': self.chat_id, 'message_id': messageId, 'disable_notification': disableNotify}) | |
if res: | |
if data := res.good(): | |
return data['result'] | |
elif data := res.bad(): | |
self.sendMessage(data['description']) | |
# Returns True on success. | |
def unpin(self, to): | |
# Returns 'Bad Request: CHAT_NOT_MODIFIED' if no message is pinned. | |
res = jnet.requestJson(self.infra.baseUri+ "/unpinChatMessage", {'chat_id': self.chat_id}) | |
data = res.getJson() | |
if data: | |
if res.isError(): | |
print(data['result']['description']) | |
else: | |
return data['result'] | |
def getAdminsIds(self): | |
res = jnet.requestJson(self.infra.baseUri+ "/getChatAdministrators", {"chat_id": self.chat_id}) | |
data = res.getJson() | |
if data: | |
if res.isError(): | |
print(data['description']) | |
else: | |
admins = [] | |
for member in data['result']: | |
admins.append(member['user']['id']) | |
return admins | |
class ChatMember(Optional): | |
def __init__(self, data): | |
Optional.__init__(self, data) | |
self.user = User(self.user) | |
class ChatPermissions: | |
def __init__(self, level=None): | |
# all members have messages, poll, and add members permissions. | |
if level == None: | |
self.restrict() | |
elif level == "basic": | |
self.unrestrict() | |
elif level == "media": | |
self.allow_media() | |
def __str__(self): | |
return json.dumps(self.data) | |
def unrestrict(self): | |
self.data = { | |
"can_send_messages": True, | |
"can_send_media": False, | |
"can_send_polls": True, | |
"can_send_other_messages": False, | |
"can_add_web_page_previews": False, | |
"can_change_info": False, | |
"can_invite_users": True, | |
"can_pin_messages": False | |
} | |
def allow_media(self): | |
self.data = { | |
# basic | |
"can_send_messages": True, | |
"can_send_polls": True, | |
"can_invite_users": True, | |
# media | |
"can_send_media": True, | |
"can_send_other_messages": True, | |
"can_add_web_page_previews": True | |
} | |
def restrict(self): | |
self.data = { | |
"can_send_messages": False | |
} | |
# Docs: https://core.telegram.org/bots/api#sticker | |
class Sticker(Optional): | |
def __init__(self, data): | |
Optional.__init__(self, data) | |
# Docs: https://core.telegram.org/bots/api#stickerset | |
class StickerSet(Optional): | |
def __init__(self, data): | |
Optional.__init__(self, data) | |
if self.stickers: | |
stickers = [] | |
for stickerMap in self.stickers: | |
stickers.append(Sticker(stickerMap)) | |
self.stickers = stickers | |
class InlineQueryResultGifs: | |
def __init__(self): | |
self.list = [] | |
self.id = 0 | |
def add(self, url, thumb_url): | |
self.list.append({ | |
"type": "gif", | |
"id": str(self.id), | |
"gif_url": url, | |
"thumb_url": thumb_url, | |
"thumb_mime_type": "image/gif" | |
}) | |
self.id += 1 | |
class Bot(Infrastructure): | |
def __init__(self, botKey): | |
Infrastructure.__init__(self, botKey) | |
self.commands = [] | |
self.helpString = "" | |
self.botQueryUri = "https://api.telegram.org/bot"+botKey | |
self.inlineQueryMethod = None | |
# bit switches | |
self.logUsers = False | |
self.botData = self.getMe() | |
self.on("help", self.help) | |
self.on("online", self.online) | |
# Set the infrastructure for base classes. | |
Chat.setInfra(self) | |
# The heart of the framework. | |
def start(self, globals=None): | |
print(self.botData) | |
print("Started %s." % self.botData.username) | |
# Automagically assigns callbacks. | |
if globals: | |
self.setCallbacks(globals) | |
updates = [] | |
update_id = -1 | |
while True: | |
updates = self.getUpdates({ | |
'allowed_updates': 'message', | |
'timeout': 15, | |
'offset': update_id+1 | |
}) | |
update_id = self.processUpdates(updates) | |
def processUpdates(self, updates): | |
update_id = -1 | |
for update in updates: | |
# For webhooks, convert dicts to Updates | |
if type(update) == dict: | |
update = Update(update) | |
if update.message or update.edited_message: | |
message = update.message or update.edited_message | |
text, extras = self.processMessage(message) | |
self.run(text, extras) | |
if update.inline_query: | |
self.processInlineQuery(update.inline_query) | |
update_id = update.update_id | |
return update_id | |
def processMessage(self, msg): | |
message = "" | |
extras = {} | |
extras = { | |
"messageId": msg.message_id, | |
"chatId": msg.chat.id, | |
"sender": msg.sender.first_name, | |
"senderId": msg.sender.id, | |
"username": msg.sender.username | |
} | |
if msg.text: | |
message = msg.text | |
if message.startswith("/"): | |
if message.find(" ") != -1: | |
commandStr, message = message.split(" ", 1) | |
commandStr = commandStr | |
else: | |
# if there is no space it's a simple command and in | |
# this case it should be moved to commandStr | |
commandStr = message | |
message = "" | |
if commandStr.find("@") != -1: | |
commandStr, extras['commandBot'] = commandStr.split("@", 1) | |
extras['commandStr'] = commandStr[1:] # slice off the slash | |
extras['private'] = True if msg.chat.type == "private" \ | |
else False | |
if msg.entities: | |
entities = msg.entities | |
mentions = [] | |
for entity in entities: | |
if entity['type'] == "text_mention": | |
mentions.append(entity['user']['id']) | |
elif entity['type'] == "mention": | |
offset = entity['offset'] | |
length = entity['length'] | |
mentions.append(msg.text[offset:offset+length]) | |
if mentions != []: | |
extras['mentions'] = mentions | |
extras["replyMsg"] = False | |
if msg.reply_to_message: | |
reply = msg.reply_to_message | |
extras["replyMsg"] = True | |
extras.update({ | |
"replyingToMsgUserObj": reply.sender, | |
"replyingToMsgId": reply.message_id | |
}) | |
if reply.text: | |
extras["replyingToMsgText"] = reply.text | |
extras['bots'] = [] | |
if msg.new_chat_members: | |
for user in msg.new_chat_members: | |
if user.is_bot: | |
extras['bots'].append({ | |
"user": user.first_name, | |
"id": user.id | |
}) | |
for user in msg.new_chat_members: | |
print("Joined " + user.first_name) | |
if msg.left_chat_member: | |
print("Leaving " + msg.left_chat_member.first_name) | |
extras['stickerMsg'] = False | |
if msg.sticker: | |
extras.update({ | |
"sitckerMsg": True, | |
'stickerSet': msg.sticker['set_name'], | |
'stickerEmoji': msg.sticker['emoji'] | |
}) | |
if "commandStr" in extras or self.logUsers: | |
print("Extras:", extras) | |
print(extras['sender'], repr(message)) | |
return [message, extras] | |
def processInlineQuery(self, query): | |
if self.inlineQueryMethod: | |
self.inlineQueryMethod(query['id'], query['query']) | |
def setCallbacks(self, globals): | |
if globals: | |
privmethods = utils.getPrivateMethods(globals) | |
for method in privmethods: | |
print(method) | |
self.on(method.replace("__cmd__", ""), privmethods[method]) | |
def run(self, message, extras): | |
chat = self.getChat(extras['chatId']) | |
for triggerData in self.commands: | |
pattern = triggerData[0] | |
command = triggerData[1] | |
if "commandStr" in extras: | |
if pattern == extras['commandStr']: | |
command(message, chat, extras) | |
return | |
if "__msg__" in self.commands: | |
print("Running message handler") | |
self.commands["__msg__"](text, chat, extras) | |
def on(self, pattern, method): | |
self.commands.append([pattern, method]) | |
def addHelpString(self, text): | |
self.helpString += text + "\n" | |
def addUnfilteredMsgHandler(self, method): | |
self.unfilteredMsgHandlerMethod = method | |
def addInlineQueryMethod(self, method): | |
self.inlineQueryMethod = method | |
# self event handlers | |
def help(self, message, chat, extras): | |
if self.helpString == "": | |
chat.sendMessage("No help set") | |
else: | |
chat.sendMessage(self.helpString) | |
def online(self, message, chat, extras): | |
chat.sendMessage("I'm online!") | |
# end | |
def getMentionText(self, sender, userId): | |
return "[" + sender + "](tg://user?id=" + str(userId) + ")" | |
def answerInlineQuery(self, queryId, answer): | |
res = jnet.requestJson(self.botQueryUri+ "/answerInlineQuery", {'inline_query_id': queryId, 'results': json.dumps(answer.list)}) | |
if not res.isError(): | |
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Utils | |
# Version: 0.0.2 | |
# Get global private __cmd__ and __msg__ methods as a map. | |
def getPrivateMethods(glbs): | |
keys = list(glbs.keys()) | |
gbMethods = {} | |
for method in keys: | |
if (method.startswith("__cmd__") or method.startswith("__msg__")) and callable(glbs[method]): | |
gbMethods[method] = glbs[method] | |
return gbMethods |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I made a bot using this.