Created
June 2, 2019 11:20
-
-
Save spookyahell/6ab3725d1b4a38eebc878932e4ae43ab to your computer and use it in GitHub Desktop.
python telegram Bot tools - import some more advanced features easily into any bot
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from telegram.ext import CommandHandler | |
import speedtest | |
import time | |
def ping_pong(dispatcher): | |
def ping_pong_func(bot, update): | |
if update.message.text.startswith('/ping'): | |
update.message.reply_text('Pong!') | |
else: | |
update.message.reply_text('Ping!') | |
dispatcher.add_handler(CommandHandler(['ping','pong'],ping_pong_func)) | |
return dispatcher | |
allowed_ids = [] | |
class CustomCommandHandler(CommandHandler): | |
def handle_update(self, update, dispatcher): | |
"""Send the update to the :attr:`callback`. | |
Args: | |
update (:class:`telegram.Update`): Incoming telegram update. | |
dispatcher (:class:`telegram.ext.Dispatcher`): Dispatcher that originated the Update. | |
""" | |
optional_args = self.collect_optional_args(dispatcher, update) | |
if update.message.from_user.id not in globals()['allowed_ids']: | |
update.message.reply_text('Terms not accepted. Please accept the terms first. Read about them at /terms.\n\ | |
Or if you A) already know them or B) literally just do not care about your data privacy go ahead and /acceptterms \ | |
(And ofc you can read them after accepting too)') | |
return | |
message = update.message or update.edited_message | |
if self.pass_args: | |
optional_args['args'] = message.text.split()[1:] | |
return self.callback(dispatcher.bot, update, **optional_args) | |
def require_terms(dispatcher, TERMS, projectID = 'projectAlpha', language = 'en'): | |
global allowed_ids | |
if 'allowed_ids' not in globals(): | |
allowed_ids = [] | |
after_term = {'en':'Accept terms with /acceptterms, if you deny them there is nothing else left for you to do.', | |
'de':'Akzeptiere die Bestimmungen mit /acceptterms, wenn Sie nicht einverstanden sind, musst du nichts weiter tun.',} | |
already_accepted = {'en':'You already accepted these terms.', | |
'de':'Sie haben die Bedingungen bereits akzeptiert.',} | |
if language in after_term: | |
the_after_term = after_term[language] | |
else: | |
the_after_term = after_term['en'] | |
if language in already_accepted: | |
the_accepted_after_term = already_accepted[language] | |
else: | |
the_accepted_after_term = already_accepted['en'] | |
def send_terms(bot, update): | |
term_msg_txt = '`' + TERMS + '`' | |
if not update.message.from_user.id in globals()['allowed_ids']: | |
term_msg_txt += '\n' + the_after_term | |
else: | |
term_msg_txt += '\n(' + the_accepted_after_term + ')' | |
update.message.reply_text(term_msg_txt, parse_mode = 'Markdown') | |
def accept_terms(bot, update): | |
allowed_ids.append(update.message.from_user.id) | |
update.message.reply_text('Thanks for agreeing to comply with the terms.') | |
dispatcher.add_handler(CommandHandler('terms', send_terms)) | |
dispatcher.add_handler(CommandHandler('acceptterms', accept_terms)) | |
return dispatcher | |
def include_speedtest(dispatcher, limit = 5, CommandHandler = CommandHandler): | |
global last_speedtest | |
if 'last_speedtest' not in globals(): | |
last_speedtest = {} | |
def do_speedtest(provideImage): | |
servers = [] | |
# If you want to test against a specific server | |
# servers = [1234] | |
s = speedtest.Speedtest() | |
s.get_servers(servers) | |
s.get_best_server() | |
s.download() | |
s.upload() | |
if provideImage: | |
return s.results.share() | |
else: | |
results = s.results.dict() | |
upload_MiB = results['upload'] / 8388608 | |
download_MiB = results['download'] / 8388608 | |
uploaded_MiB = results['bytes_sent'] / 1048576 | |
downloaded_MiB = results['bytes_received'] / 1048576 | |
text = '*Server*: ' + results['server']['sponsor'] + ' (' + results['server']['name'] + ', '+ results['server']['country'] + ')\n' | |
text += '*Host*: `' + results['server']['host'] + '`\n' | |
text += f'*Upload*: {upload_MiB:0.02f} MiB/s (Data transferred: {uploaded_MiB:0.02f} MiB)\n' | |
text += f'*Download*: {download_MiB:0.02f} MiB/s (Data transferred: {downloaded_MiB:0.02f} MiB)' | |
return text | |
def speedtest_cmd(bot, update, imageMode = False): | |
def wait_time(timebetween, time_wait): | |
seconds_left = time_between_required-time_wait | |
minutes, seconds = divmod(seconds_left, 60) | |
if minutes > 0: | |
return f'{minutes:0.00f}m{seconds:0.02f}s' | |
else: | |
return f'{seconds_left:0.02f}s' | |
if update.message.from_user.id in last_speedtest: | |
time_between_required = limit * 60 | |
if time.time() - last_speedtest[update.message.from_user.id] >= time_between_required: | |
bot.send_chat_action(update.message.chat.id, 'typing') | |
result = do_speedtest(imageMode) | |
update.message.reply_text(result, parse_mode = 'Markdown') | |
last_speedtest[update.message.from_user.id] = time.time() | |
else: | |
time_wait = time.time() - last_speedtest[update.message.from_user.id] | |
update.message.reply_text(f'You must wait {wait_time(time_between_required,time_wait)} before sending a new speedtest command') | |
else: | |
bot.send_chat_action(update.message.chat.id, 'typing') | |
result = do_speedtest(imageMode) | |
update.message.reply_text(result, parse_mode = 'Markdown') | |
last_speedtest[update.message.from_user.id] = time.time() | |
def speedtest_cmd_image(bot, update): | |
return speedtest_cmd(bot, update, imageMode = True) | |
dispatcher.add_handler(CommandHandler('speedtest', speedtest_cmd)) | |
dispatcher.add_handler(CommandHandler('speedtestImage', speedtest_cmd_image)) | |
return dispatcher |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment