Last active
February 7, 2016 20:31
-
-
Save makiftasova/c4670e7110db7deae09c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Mehmet Akif TAŞOVA <[email protected]> | |
A Sample class derieved from telepot.async.Bot which has an implemented command handle mechanism | |
""" | |
import telepot | |
import telepot.async | |
import asyncio | |
import pprint # for test code | |
class CommandAwareBot(telepot.async.Bot): | |
@staticmethod | |
def default_on_message(bot, chat_id, content_type, msg): | |
pass | |
@staticmethod | |
def default_on_inline_query(bot, query_id, from_id, query_string, msg): | |
pass | |
@staticmethod | |
def default_on_inline_query_answer(bot, result_id, from_id, query_string, msg): | |
pass | |
@staticmethod | |
def default_on_unknown_command(bot, chat_id, cmd, msg): | |
pass | |
@staticmethod | |
def default_on_user_join(bot, chat_id, msg): | |
pass | |
@staticmethod | |
def default_on_user_leave(bot, chat_id, msg): | |
pass | |
@staticmethod | |
def is_command(msg): | |
if 'text' in msg: | |
if msg['text'][0] == '/': | |
return True | |
return False | |
@staticmethod | |
def get_command(msg): | |
if CommandAwareBot.is_command(msg): | |
return msg['text'] | |
return "" | |
@staticmethod | |
def parse_command(cmd): | |
""" | |
:param cmd: whole command text with all params. eg: "/command param1 param2 ..." | |
:return: cmd, params. cmd will be something like "/command" | |
""" | |
txt_split = cmd.split() | |
return txt_split[0].split("@")[0], txt_split[1:] | |
@staticmethod | |
def get_message_owner(msg): | |
return msg['from'] | |
@staticmethod | |
def get_chat_id(msg): | |
return msg['chat']['id'] | |
def __init__(self, bot_token): | |
super(CommandAwareBot, self).__init__(bot_token) | |
self.commands = {} | |
self.on_message_callback = CommandAwareBot.default_on_message | |
self.on_inline_query_callback = CommandAwareBot.default_on_inline_query | |
self.on_inline_query_answer_callback = CommandAwareBot.default_on_inline_query_answer | |
self.on_unknown_command_callback = CommandAwareBot.default_on_unknown_command | |
self.on_user_join_callback = CommandAwareBot.default_on_user_join | |
self.on_user_leave_callback = CommandAwareBot.default_on_user_leave | |
def set_on_message_handler(self, func=default_on_message): | |
self.on_message_callback = func | |
def set_on_inline_query_handler(self, func=default_on_inline_query): | |
self.on_inline_query_callback = func | |
def set_on_inline_query_answer_handler(self, func=default_on_inline_query_answer): | |
self.on_inline_query_answer_callback = func | |
def set_on_unknown_command_handler(self, func=default_on_unknown_command): | |
self.on_unknown_command_callback = func | |
def set_on_user_join_handler(self, func=default_on_user_join): | |
self.on_user_join_callback = func | |
def set_on_user_leave_handler(self, func=default_on_user_leave): | |
self.on_user_leave_callback = func | |
def add_command_handler(self, cmd, func): | |
self.commands[cmd] = func | |
def remove_command_handler(self, cmd): | |
if cmd in self.commands: | |
del self.commands[cmd] | |
@asyncio.coroutine | |
def handle(self, msg): | |
flavor = telepot.flavor(msg) | |
if flavor == 'normal': | |
content_type, chat_type, chat_id = telepot.glance2(msg) | |
if CommandAwareBot.is_command(msg): | |
cmd, params = CommandAwareBot.parse_command(CommandAwareBot.get_command(msg)) | |
if cmd in self.commands: | |
yield from self.commands[cmd](self, chat_id, params, msg['from']) | |
else: | |
yield from self.on_unknown_command_callback(self, chat_id, cmd, msg) | |
elif content_type == 'new_chat_participant': | |
yield from self.on_user_join_callback(self, chat_id, msg) | |
elif content_type == 'left_chat_participant': | |
yield from self.on_user_leave_callback(self, chat_id, msg) | |
else: | |
yield from self.on_message_callback(self, chat_id, content_type, msg) | |
elif flavor == "inline_query": | |
query_id, from_id, query_string = telepot.glance2(msg, flavor=flavor) | |
yield from self.on_inline_query_callback(self, query_id, from_id, query_string, msg) | |
elif flavor == 'chosen_inline_result': | |
result_id, from_id, query_string = telepot.glance2(msg, flavor=flavor) | |
yield from self.on_inline_query_answer_callback(self, result_id, from_id, query_string, msg) | |
else: | |
raise telepot.BadFlavor(msg) | |
# TEST CODE BEGINS | |
@asyncio.coroutine | |
def command_start(bot, chat_id, params, user): | |
yield from bot.sendMessage(chat_id, "Hello {name}".format(name=user['first_name'])) | |
pprint.pprint(user) | |
@asyncio.coroutine | |
def bot_on_message(bot, chat_id, content_type, msg): | |
pprint.pprint(msg) | |
yield from bot.sendMessage(chat_id, "Message received. type: {ctype}".format(ctype=content_type)) | |
@asyncio.coroutine | |
def bot_on_unknown_command(bot, chat_id, cmd, msg): | |
yield from bot.sendMessage(chat_id, "Unknown Command: {cmd}".format(cmd=cmd)) | |
@asyncio.coroutine | |
def bot_on_user_join(bot, chat_id, msg): | |
yield from bot.sendMessage(chat_id, "New User: {name}".format(name=msg['new_chat_participant']['first_name'])) | |
@asyncio.coroutine | |
def bot_on_user_leave(bot, chat_id, msg): | |
yield from bot.sendMessage(chat_id, "{name} Left".format(name=msg['left_chat_participant']['first_name'])) | |
def main(): | |
_TOKEN = "TELEGRAM_BOT_TOKEN" # Get this from @BotFather | |
bot = CommandAwareBot(_TOKEN) | |
bot.add_command_handler('/start', command_start) | |
bot.set_on_message_handler(bot_on_message) | |
bot.set_on_unknown_command_handler(bot_on_unknown_command) | |
bot.set_on_user_join_handler(bot_on_user_join) | |
bot.set_on_user_leave_handler(bot_on_user_leave) | |
loop = asyncio.get_event_loop() | |
loop.create_task(bot.messageLoop()) | |
print("Listening...") | |
loop.run_forever() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment