Skip to content

Instantly share code, notes, and snippets.

@anmolj7
Last active December 28, 2019 20:30
Show Gist options
  • Save anmolj7/c558226f2a52968d7c4fa30adb6d96fd to your computer and use it in GitHub Desktop.
Save anmolj7/c558226f2a52968d7c4fa30adb6d96fd to your computer and use it in GitHub Desktop.
from flask import Flask, request
import telegram
import os
os.chdir('mysite')
bot_token = "API KEY"
bot_user_name = "Gotoh_Welcome_Bot"
URL = "your hosted url"
TOKEN = bot_token
bot = telegram.Bot(token=TOKEN)
app = Flask(__name__)
DATA_URL = "https://api.github.com/orgs/fedora-infra/repos"
WELCOME_MSG = "Hello {}. Welcome To The Group. Please read the pinned message."
def store_admin(chat_id):
with open("admins.txt", "a+") as f:
f.write(str(chat_id) + '\n')
def is_admin(chat_id):
with open('admins.txt') as f:
data = f.readlines()
data = [line.strip('\n') for line in data]
if str(chat_id) in data:
return True
return False
def add_group(title, chat_id):
with open('groups.txt', 'a+') as f:
data = f.readlines()
if title not in data:
f.write(f'{title}, {chat_id}\n')
def get_groups():
with open('groups.txt') as f:
data = f.readlines()
data = [line.strip('\n').split(', ')[0] for line in data]
data = list(set(data))
output = ''
for i, group in enumerate(data):
output += f'Group {i + 1}: {group}\n'
return output
def get_group_ids():
with open('groups.txt') as f:
data = f.readlines()
data = [line.strip('\n').split(', ')[1] for line in data]
data = list(set(data))
data = list(map(int, data))
return data
@app.route('/{}'.format(TOKEN), methods=["POST"])
def respond():
global WELCOME_MSG
update = telegram.Update.de_json(request.get_json(force=True), bot)
if len(update.message.new_chat_members):
for new_user_obj in update.message.new_chat_members:
chat_id = update.message.chat.id
try:
new_user = "@" + new_user_obj['username']
except Exception as e:
new_user = new_user_obj['first_name']
print(str(e))
bot.sendMessage(chat_id=chat_id, text=WELCOME_MSG.format(str(new_user)), parse_mode='HTML')
else:
try:
update = telegram.Update.de_json(request.get_json(force=True), bot)
chat_id = update.message.chat_id
msg_id = update.message.message_id
text = update.message.text.encode('utf-8').decode()
# new_chat_member = update.message.new_chat_member
print("Recieved Text Message: ", text)
bot_welcome = """
GOTOH_Welcome_Bot.
This bot is strictly for welcoming new users to group chats.
"""
title = bot.getChat(chat_id).title
if title:
add_group(title, chat_id)
if text == "/start":
bot.sendMessage(chat_id=chat_id, text=bot_welcome,
reply_to_message_id=msg_id)
else:
if "/help" in text:
message = bot_welcome
if is_admin(chat_id):
message += "\n/push <msg> to push a message to all the groups\n/get_msg Returns the welcome " \
"message\n/set_msg <msg> Sets the custom welcome message for a new user. However, " \
"The text must contain a {} indicating the user name\n/get_groups To get the " \
"number of groups and their titles "
elif "/verify" in text:
text = text.lstrip('/verify ')
print(f'text: {text}')
if is_admin(chat_id):
message = "You're already an admin."
else:
if text == "aSecretKey":
store_admin(chat_id)
message = "Congratulations! You can now control the bot."
else:
message = "Wrong Pass."
elif '/get_msg' in text:
message = f'"{WELCOME_MSG}"'
elif '/set_msg' in text:
text = text.replace('/set_msg ', '')
if '{}' in text:
text.format('user_name')
WELCOME_MSG = text
message = f'Welcome Message Successfully set to: "{WELCOME_MSG}"'
else:
message = "Wrong Message Format"
elif '/get_groups' in text:
message = get_groups()
elif '/push ' in text:
text = text.lstrip('/push ')
IDS = get_group_ids()
for ind in IDS:
bot.sendMessage(chat_id=ind, text=text)
message = "Messages Sent Successfully."
try:
bot.sendMessage(chat_id=chat_id, text=message,
reply_to_message_id=msg_id)
except Exception as e:
print(str(e))
except Exception as e:
print(str(e))
return 'ok'
@app.route('/set_webhook', methods=['GET', 'POST'])
def set_webhook():
s = bot.setWebhook('{URL}{HOOK}'.format(URL=URL, HOOK=TOKEN))
if s:
return "webhook setup ok"
else:
return "webhook setup failed"
@app.route("/")
def index():
return '.'
if __name__ == "__main__":
app.run(threaded=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment