Created
March 27, 2022 05:26
-
-
Save iuriguilherme/d94347d8315b9c7b0f0141c8093def36 to your computer and use it in GitHub Desktop.
Web form with telegram bot
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## This code makes a web form to send messages to telegram users | |
## pip install aiogram quart flask-wtf | |
## QUART_APP=app:app quart run | |
import logging | |
logger = logging.getLogger(__name__) | |
import asyncio, secrets | |
from aiogram import ( | |
Bot, | |
Dispatcher, | |
) | |
from quart import ( | |
Quart, | |
current_app, | |
flask_patch, | |
jsonify, | |
request, | |
) | |
from flask_wtf import FlaskForm | |
from wtforms import ( | |
StringField, | |
SubmitField, | |
TextAreaField, | |
) | |
token = "123456:ABC-DEF1234ghIkl-zyx57W2v1u123ew11" | |
bot = Bot(token = token) | |
dispatcher = Dispatcher(bot) | |
app = Quart(__name__) | |
app.secret_key = secrets.token_urlsafe(32) | |
@app.before_serving | |
async def before_serving(): | |
## There are other ways to do this, I do it like this because I then | |
## can access the dispatcher object with current_app.dispatcher | |
setattr(current_app, 'dispatcher', dispatcher) | |
loop = asyncio.get_event_loop() | |
## You can start the webhooks here instead | |
loop.create_task(dispatcher.start_polling( | |
reset_webhook = True, | |
timeout = 20, | |
relax = 0.1, | |
fast = True, | |
allowed_updates = None, | |
)) | |
@app.after_serving | |
async def after_serving(): | |
await dispatcher.storage.close() | |
await dispatcher.storage.wait_closed() | |
class SendMessageForm(FlaskForm): | |
chat_id_field = StringField('select chat', default = 1) | |
text_field = TextAreaField('message', default = u"Nevermind") | |
submit = SubmitField('send') | |
@app.route('/', methods = ['GET', 'POST']) | |
async def send_message(): | |
message = None | |
form = SendMessageForm(formdata = await request.form) | |
if request.method == "POST": | |
try: | |
await dispatcher.bot.send_message( | |
chat_id = int(form['chat_id_field'].data), | |
text = str(form['text_field'].data), | |
) | |
except Exception as exception: | |
return jsonify(repr(exception)) | |
raise | |
return f""" | |
<form action="" method="post"> | |
<p> | |
{form.chat_id_field.label}<br> | |
{form.chat_id_field(size=5, placeholder = form.chat_id_field.description)}<br> | |
</p> | |
<p>{form.text_field.label}<br />{form.text_field}</p> | |
<p>{form.submit}</p> | |
</form> | |
""" | |
app.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment