Last active
May 2, 2023 16:21
-
-
Save woshishabii/518b890714d43cd4be3a104ab74dc3ab to your computer and use it in GitHub Desktop.
src for MTF Bot
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3.10 | |
import telepot | |
import time | |
import urllib3 | |
import sqlalchemy.orm | |
import datetime | |
import pytz | |
import pprint | |
print = pprint.pprint | |
VERSION = '0.0.1' | |
# Get token from file | |
with open('./tg_token', 'r') as f: | |
BOT_TOKEN = f.read().strip() | |
# Init ORM | |
engine = sqlalchemy.engine.create_engine('sqlite:///./tg_bot.db?check_same_thread=False', echo=False) | |
Base = sqlalchemy.orm.declarative_base() | |
class Report(Base): | |
__tablename__ = 'report' | |
ID = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) | |
twitter_id = sqlalchemy.Column(sqlalchemy.String(100)) | |
description = sqlalchemy.Column(sqlalchemy.String(1000)) | |
reviewed = sqlalchemy.Column(sqlalchemy.Boolean) | |
time = sqlalchemy.Column(sqlalchemy.DateTime(timezone=True), | |
default=datetime.datetime.now(tz=pytz.timezone("Asia/Shanghai"))) | |
Base.metadata.create_all(engine, checkfirst=True) | |
session_maker = sqlalchemy.orm.sessionmaker(bind=engine) | |
session = session_maker() | |
# You can leave this bit out if you're using a paid PythonAnywhere account | |
proxy_url = "http://proxy.server:3128" | |
telepot.api._pools = { | |
'default': urllib3.ProxyManager(proxy_url=proxy_url, num_pools=3, maxsize=10, retries=False, timeout=30), | |
} | |
telepot.api._onetime_pool_spec = ( | |
urllib3.ProxyManager, dict(proxy_url=proxy_url, num_pools=1, maxsize=1, retries=False, timeout=30)) | |
# end of the stuff that's only needed for free accounts | |
bot = telepot.Bot(BOT_TOKEN) | |
print(bot.getMe()) | |
def help(command=''): | |
""" | |
Show Help | |
:param command: help for specified command | |
""" | |
if command: | |
return "TODO: Not implemented" | |
else: | |
res = f"--===MTF Bot🏳️⚧️===--\nVersion:{VERSION}\nCommands:\n" | |
res = res + '\n'.join([f'{_1}: {COMMANDS[_1].__doc__.strip().splitlines()[0]}' for _1 in COMMANDS]) | |
return res | |
def submit(twitter_id, description): | |
""" | |
Submit a Report | |
:param twitter_id: Twitter ID of the abuser | |
:param description: Description | |
""" | |
new_report = Report(twitter_id=twitter_id, description=description, reviewed=False) | |
session.add(new_report) | |
session.commit() | |
return f"===========================\n" \ | |
f"Submit Successful:\n" \ | |
f"Twitter: https://twitter.com/{twitter_id}\n" \ | |
f"Description: {description}\n" \ | |
f"===========================" | |
def query(twitter_id): | |
""" | |
Query by twitter ID | |
:param twitter_id: Twitter ID | |
""" | |
res = [] | |
for row in session.query(Report).filter(Report.twitter_id.like(f'%{twitter_id}%')).filter(Report.reviewed == True).all(): | |
res.append([row.twitter_id, row.description]) | |
res_s = '--===Result===--' | |
res_s = res_s + '\n\n'.join([f'Twitter: https://twitter.com/{_[0]}\nDescription:{_[1]}' for _ in res]) | |
return res_s | |
# Register your commands here after declaring them above | |
# Note that: | |
# The arguments are passed in in str format | |
# The commands should return a string which is not empty | |
# The commands should have a docstring | |
COMMANDS = { | |
'submit': submit, | |
'help': help, | |
'query': query | |
} | |
# functions to parse the command | |
def parse_command(*args): | |
if args[0] in COMMANDS: | |
# Check argument length | |
command_max_arg = COMMANDS[args[0]].__code__.co_argcount | |
command_min_arg = command_max_arg - len(COMMANDS[args[0]].__defaults__ if COMMANDS[args[0]].__defaults__ else []) | |
if len(args[1:]) >= command_min_arg and len(args[1:]) <= command_max_arg: | |
return COMMANDS[args[0]](*args[1:]) | |
else: | |
return f"Invalid Argument Length(required [{command_min_arg}, {command_max_arg}]), pls check it" | |
else: | |
return 'Invalid Command, pls check it' | |
# Handle messages came in | |
def handle(msg): | |
# parse the 'msg' using telepot.glance | |
content_type, chat_type, chat_id = telepot.glance(msg) | |
print(msg) | |
if content_type == 'text': | |
if msg["text"].startswith('/'): | |
# ( maybe we should implement a function to parse it ? ) | |
# Modern Operating System really helps | |
parsed_command = msg['text'][1:].split(' ') | |
print(parsed_command) | |
res = parse_command(*parsed_command) | |
bot.sendMessage(chat_id, res) | |
else: | |
bot.sendMessage(chat_id, "It seems that you missed the '/'") | |
bot.message_loop(handle) | |
print('Listening ...') | |
# Keep the program running. | |
while 1: | |
time.sleep(10) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment