Skip to content

Instantly share code, notes, and snippets.

@spookyahell
Created June 2, 2019 11:20
Show Gist options
  • Save spookyahell/6ab3725d1b4a38eebc878932e4ae43ab to your computer and use it in GitHub Desktop.
Save spookyahell/6ab3725d1b4a38eebc878932e4ae43ab to your computer and use it in GitHub Desktop.
python telegram Bot tools - import some more advanced features easily into any bot
from telegram.ext import CommandHandler
import speedtest
import time
def ping_pong(dispatcher):
def ping_pong_func(bot, update):
if update.message.text.startswith('/ping'):
update.message.reply_text('Pong!')
else:
update.message.reply_text('Ping!')
dispatcher.add_handler(CommandHandler(['ping','pong'],ping_pong_func))
return dispatcher
allowed_ids = []
class CustomCommandHandler(CommandHandler):
def handle_update(self, update, dispatcher):
"""Send the update to the :attr:`callback`.
Args:
update (:class:`telegram.Update`): Incoming telegram update.
dispatcher (:class:`telegram.ext.Dispatcher`): Dispatcher that originated the Update.
"""
optional_args = self.collect_optional_args(dispatcher, update)
if update.message.from_user.id not in globals()['allowed_ids']:
update.message.reply_text('Terms not accepted. Please accept the terms first. Read about them at /terms.\n\
Or if you A) already know them or B) literally just do not care about your data privacy go ahead and /acceptterms \
(And ofc you can read them after accepting too)')
return
message = update.message or update.edited_message
if self.pass_args:
optional_args['args'] = message.text.split()[1:]
return self.callback(dispatcher.bot, update, **optional_args)
def require_terms(dispatcher, TERMS, projectID = 'projectAlpha', language = 'en'):
global allowed_ids
if 'allowed_ids' not in globals():
allowed_ids = []
after_term = {'en':'Accept terms with /acceptterms, if you deny them there is nothing else left for you to do.',
'de':'Akzeptiere die Bestimmungen mit /acceptterms, wenn Sie nicht einverstanden sind, musst du nichts weiter tun.',}
already_accepted = {'en':'You already accepted these terms.',
'de':'Sie haben die Bedingungen bereits akzeptiert.',}
if language in after_term:
the_after_term = after_term[language]
else:
the_after_term = after_term['en']
if language in already_accepted:
the_accepted_after_term = already_accepted[language]
else:
the_accepted_after_term = already_accepted['en']
def send_terms(bot, update):
term_msg_txt = '`' + TERMS + '`'
if not update.message.from_user.id in globals()['allowed_ids']:
term_msg_txt += '\n' + the_after_term
else:
term_msg_txt += '\n(' + the_accepted_after_term + ')'
update.message.reply_text(term_msg_txt, parse_mode = 'Markdown')
def accept_terms(bot, update):
allowed_ids.append(update.message.from_user.id)
update.message.reply_text('Thanks for agreeing to comply with the terms.')
dispatcher.add_handler(CommandHandler('terms', send_terms))
dispatcher.add_handler(CommandHandler('acceptterms', accept_terms))
return dispatcher
def include_speedtest(dispatcher, limit = 5, CommandHandler = CommandHandler):
global last_speedtest
if 'last_speedtest' not in globals():
last_speedtest = {}
def do_speedtest(provideImage):
servers = []
# If you want to test against a specific server
# servers = [1234]
s = speedtest.Speedtest()
s.get_servers(servers)
s.get_best_server()
s.download()
s.upload()
if provideImage:
return s.results.share()
else:
results = s.results.dict()
upload_MiB = results['upload'] / 8388608
download_MiB = results['download'] / 8388608
uploaded_MiB = results['bytes_sent'] / 1048576
downloaded_MiB = results['bytes_received'] / 1048576
text = '*Server*: ' + results['server']['sponsor'] + ' (' + results['server']['name'] + ', '+ results['server']['country'] + ')\n'
text += '*Host*: `' + results['server']['host'] + '`\n'
text += f'*Upload*: {upload_MiB:0.02f} MiB/s (Data transferred: {uploaded_MiB:0.02f} MiB)\n'
text += f'*Download*: {download_MiB:0.02f} MiB/s (Data transferred: {downloaded_MiB:0.02f} MiB)'
return text
def speedtest_cmd(bot, update, imageMode = False):
def wait_time(timebetween, time_wait):
seconds_left = time_between_required-time_wait
minutes, seconds = divmod(seconds_left, 60)
if minutes > 0:
return f'{minutes:0.00f}m{seconds:0.02f}s'
else:
return f'{seconds_left:0.02f}s'
if update.message.from_user.id in last_speedtest:
time_between_required = limit * 60
if time.time() - last_speedtest[update.message.from_user.id] >= time_between_required:
bot.send_chat_action(update.message.chat.id, 'typing')
result = do_speedtest(imageMode)
update.message.reply_text(result, parse_mode = 'Markdown')
last_speedtest[update.message.from_user.id] = time.time()
else:
time_wait = time.time() - last_speedtest[update.message.from_user.id]
update.message.reply_text(f'You must wait {wait_time(time_between_required,time_wait)} before sending a new speedtest command')
else:
bot.send_chat_action(update.message.chat.id, 'typing')
result = do_speedtest(imageMode)
update.message.reply_text(result, parse_mode = 'Markdown')
last_speedtest[update.message.from_user.id] = time.time()
def speedtest_cmd_image(bot, update):
return speedtest_cmd(bot, update, imageMode = True)
dispatcher.add_handler(CommandHandler('speedtest', speedtest_cmd))
dispatcher.add_handler(CommandHandler('speedtestImage', speedtest_cmd_image))
return dispatcher
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment