Created
March 17, 2020 16:43
-
-
Save xmoiduts/fedc0309f571041d40fbcac321cde3c2 to your computer and use it in GitHub Desktop.
Micropython ESP32 Telegram Bot async API uasyncio
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Async Telegram-bot interface inspired by https://github.com/Lepeshka92/TelegaGraph . | |
# async examples: https://github.com/micropython/micropython-lib/blob/master/uasyncio/example_http_client.py, | |
# DO NOT use their ping-pong test case 'cause I failed. | |
import gc | |
import ujson | |
import uasyncio as asyncio | |
import uerrno | |
class Telegram_API: | |
def __init__(self, TOKEN): | |
self.hostname = 'api.telegram.org' | |
self.bot_token = 'bot'+TOKEN | |
self.kbd = { | |
'keyboard': [ | |
['Good Morning', 'Nap 40 Minutes'], | |
['Good Night', 'HW reboot'] | |
], #TODO: Split this out | |
'resize_keyboard': True, | |
'one_time_keyboard': False | |
} | |
self.upd = { | |
'offset': 0, | |
'limit': 1, | |
'timeout': 60, # Known issue: Max 50s timeout for Telegram long polling. | |
'allowed_updates': ['message'] #TODO: inlineQuery support ? | |
} | |
self.writer = None | |
self.reader = None | |
def setKbdKeys(self, kbd_keys: list): | |
self.kbd['keyboard'] = kbd_keys | |
def setTimeout(self, timeout: int): | |
self.upd['timeout'] = timeout | |
def dict2querystring(self, data:dict): | |
# in: {'limit':20, 'offset':20333, 'id':'abc'} | |
# out: limit=20&offset=20333&id=abc | |
res = '' | |
for key in data.keys(): | |
res += '{}={}'.format(str(key),str(data[key])) | |
res += '&' | |
return res[:-1] | |
async def Request_GET(self, action, query_string) -> dict: | |
# send a GET request to telegram server | |
# return an http response with json content | |
# | |
# action: sendMessage, getUpdates | |
# query_string: chat_id=777555333&text=received | |
#gc.collect() # collect as you wish. | |
reader, writer = await asyncio.open_connection(self.hostname, 443, True) | |
#print("sendRequest: {}, {}".format(action, query_string), end='') | |
query = "GET /{}/{}?{} HTTP/1.0\r\n\r\n".format(self.bot_token, action, query_string) | |
await writer.awrite(query.encode('latin-1')) | |
while True: | |
line = await reader.readline() | |
if line: | |
pass | |
#print('<', end='') | |
if line == b'\r\n': | |
break #break on reading a full blank line. | |
else: | |
return # return (None) when only received HTTP responce head. | |
#print('') | |
response = ujson.loads (await reader.read()) | |
await reader.aclose() | |
await writer.aclose() | |
return response #return the response body parsed as dict | |
async def send(self, chat_id, text, keyboard:list=None, silent = True): | |
data = {'chat_id': chat_id, 'text': text, 'disable_notification': silent} | |
if keyboard: | |
self.kbd['keyboard'] = keyboard | |
data['reply_markup'] = ujson.dumps(self.kbd) | |
try: | |
jo = await self.Request_GET('sendMessage', self.dict2querystring(data)) | |
print('OK:' + str(jo['ok'])) | |
except Exception as e: | |
print('TG_Send err: {}'.format(e)) | |
finally: | |
pass | |
async def update(self): | |
result = [] | |
try: | |
jo = await self.Request_GET('getUpdates', self.dict2querystring(self.upd)) | |
#print('TG_update: {}'.format(jo)) | |
except Exception as e: | |
if e.args[0] == -104: | |
print('TG_Updat e: Conn Reset') # Guess: -104 connection reset | |
else: | |
print('TG_Updat e: {}'.format(e)) | |
# TODO: when wlan is not connected: 'list index out of range' | |
return None | |
finally: | |
pass | |
if 'result' in jo: | |
for item in jo['result']: | |
try: | |
if 'username' not in item['message']['chat']: | |
item['message']['chat']['username'] = 'notset' | |
result.append((item['message']['chat']['id'], | |
item['message']['chat']['username'], | |
item['message']['text'])) | |
except KeyError: | |
print('TG_update: skiping an unqualified message') | |
#~~prepare to confirm this update in future HTTP requests,~~ | |
#~~sometimes the `if 'text'..` statements are False~~ | |
#~~while those messages still need confirmation.~~ | |
finally: | |
self.upd['offset'] = jo['result'][-1]['update_id'] + 1 | |
return result | |
async def listen(self, handler = None): | |
while True: | |
try: | |
messages = await self.update() | |
if messages: | |
#await asyncio.sleep(1) | |
print(messages) | |
#await self.send(messages[0][0], messages[0][2]) #ping-pong the reseived message | |
if handler != None: | |
await handler(messages)#TODO: CONFIRM UPDATE before HW reboot command. | |
finally: | |
gc.collect() | |
#print('free mem: {} Bytes'.format(gc.mem_free())) | |
#await asyncio.sleep(1) | |
# TODO possible ceveats: | |
# flooding retries when wifi not connected/interrupted; | |
# ctrl+c as well as unhandled exceptions will leave reader/writers open, | |
# causing ENOMEM OSError if you try to restart the loop without soft reboot. | |
# this can't be aclose():d using try-except schemes since your\ | |
# KeyBoardInterrupt will mostly hit other executing coroutines; | |
# Message including ': H' cannot be sent and will result in an JSON error | |
#Test | |
''' | |
from tg_async_api import * | |
tg = Telegram_API(TOKEN='123456789:AbcdefgHijKLmnOPqrsT8') | |
loop = asyncio.get_event_loop() | |
loop.create_task(tg.listen()) | |
loop.run_forever() | |
#loop.create_task(tg.send(your_id_int, 'hello')) | |
#loop.create_task(tg.update()) | |
#loop.run_forever() | |
''' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment