Last active
December 30, 2018 01:59
-
-
Save RedL0tus/bbb5e8d12860a1252f363ddfddad6f10 to your computer and use it in GitHub Desktop.
Telegram Spammer as a Service
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- encoding: utf-8 -*- | |
# Telegram Spammer as a Service - Test how fast your anti-spam bot is | |
# Copyright (C) 2018 Kay Lin <[email protected]> | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# You should have received a copy of the GNU General Public License | |
# along with this program. If not, see <https://www.gnu.org/licenses/>. | |
__author__ = 'Kay Lin <[email protected]>' | |
import os | |
import json | |
import time | |
import logging | |
from pyrogram import Client, Filters, Error | |
app = Client('homo') | |
SPAM = '''\ | |
{user} 请求测试本群的 CAPTCHA 系统,下面是 spam 正文: | |
同性交友,排解寂寞 | |
https://github.com/ | |
此帐号是一个用于测试 CAPTCHA bot 封禁速度的 user bot,详细信息请私聊。 | |
''' | |
HELP = '''\ | |
这是一个用以测试群组防 Spam 措施(主要是 CAPTCHA 类)反应速度的 userbot。 | |
使用方法: | |
<code>'spam [群组链接(如果是私有群)或群组用户名(公开群组)]{1,5}</code> | |
注:[]{1,5} 为正则语法,表示至少一个最多五个 | |
请不要滥用,请仅在管理员同意之后使用本 bot,也请各位管理员不要封禁此帐号,(不然我会很头疼...)。 | |
本 bot 是开源程序,完整源码可在这里( https://gist.github.com/RedL0tus/bbb5e8d12860a1252f363ddfddad6f10 )找到,使用 GPLv3+ 协议发布,如要重新使用请遵守许可条款。 | |
如遇到问题请咨询 @TheSaltedFish。 | |
''' | |
UNKNOWN_ERROR = '''\ | |
蜜汁错误 | |
''' | |
RATE_LIMIT_NOTICE = '''\ | |
您已被流量限制,请一分钟后再试 | |
''' | |
TOO_FEW_ARGUMENTS = '''\ | |
至少需要一个用户名或链接 | |
''' | |
TOO_MANY_ARGUMENTS = '''\ | |
一次最多只能 spam 五个群 | |
''' | |
LICENSE_NOTICE = '''\ | |
Spammer as a Service version 1, Copyright (C) 2018 Kay Lin | |
Licensed under the terms of the GNU General Public License v3 or later (GPLv3+) | |
''' | |
RATE_LIMIT = dict() | |
HISTORY = [] | |
START_TIME = int(time.time()) | |
class FailedToJoinChat(Exception): | |
pass | |
class FailedToSpam(Exception): | |
pass | |
class NotAGroup(Exception): | |
pass | |
class FailedToQuit(Exception): | |
pass | |
def spam(client, chat_id, requester): | |
logger = logging.getLogger('Spamming Procedure') | |
try: | |
logger.info('>>> Trying to join %s' % chat_id) | |
client.join_chat(chat_id) | |
except Error as e: | |
raise FailedToJoinChat(e) | |
except IndexError as e: | |
raise NotAGroup('Not a valid group') | |
for dialog in client.get_dialogs().dialogs: | |
try: | |
if dialog.chat.type == 'group' or dialog.chat.type == 'supergroup': | |
client.send_message( | |
dialog.chat.id, | |
SPAM.format(user=requester) | |
) | |
except Error as e: | |
client.leave_chat(dialog.chat.id, delete=True) | |
raise FailedToSpam(e) | |
try: | |
client.leave_chat(dialog.chat.id, delete=True) | |
except Error as e: | |
raise FailedToQuit(e) | |
def spam_wrapper(client, message): | |
logger = logging.getLogger('Spam Controller') | |
if message.from_user.id in RATE_LIMIT.keys() and\ | |
RATE_LIMIT[message.from_user.id] > (time.time() - 60): | |
client.send_message( | |
message.chat.id, | |
RATE_LIMIT_NOTICE, | |
reply_to_message_id=message.message_id | |
) | |
return | |
success = [] | |
failed = dict() | |
if message.from_user.username: | |
requester = '@' + message.from_user.username | |
else: | |
requester = 'ID: ' + message.from_user.id | |
chat_ids = message.text.split(' ')[1:] | |
if len(chat_ids) > 5: | |
logger.error('>>> Too many arguments') | |
client.send_message( | |
message.chat.id, TOO_MANY_ARGUMENTS, | |
reply_to_message_id=message.message_id | |
) | |
return | |
elif not chat_ids: | |
logger.error('>>> Too few arguments') | |
client.send_message( | |
message.chat.id, TOO_FEW_ARGUMENTS, | |
reply_to_message_id=message.message_id | |
) | |
return | |
logger.info('>>> %s requested to spam %s' % (message.from_user.id, chat_ids)) | |
for chat_id in chat_ids: | |
try: | |
spam(client, chat_id, requester) | |
except (FailedToJoinChat, NotAGroup) as e: | |
failed[chat_id] = e | |
logger.error('>>> Failed to join %s: %s' % (chat_id, e)) | |
continue | |
except FailedToSpam as e: | |
failed[chat_id] = e | |
logger.error('>>> Failed to spam %s: %s' % (chat_id, e)) | |
continue | |
except FailedToQuit as e: | |
logger.error('>>> Failed to quit %s: %s' % (chat_id, e)) | |
pass | |
logger.info('>>> %s succeed.' % chat_id) | |
success.append(chat_id) | |
response = '' | |
if success: | |
response += '>>> 成功: %s\n' % ', '.join(success) | |
for failure in failed.keys(): | |
response += '\n>>> %s 失败: %s\n' % (failure, failed[failure]) | |
if response: | |
client.send_message( | |
message.chat.id, response, | |
reply_to_message_id=message.message_id | |
) | |
else: | |
client.send_message( | |
message.chat.id, UNKNOWN_ERROR, | |
reply_to_message_id=message.message_id | |
) | |
RATE_LIMIT[message.from_user.id] = int(time.time()) | |
event = dict() | |
event['time'] = time.time() | |
event['requester_id'] = message.from_user.id | |
event['requester_name'] = requester | |
event['message'] = message.text | |
event['success'] = success | |
event['failed'] = [ key for key in failed.keys() ] | |
event['response'] = response | |
HISTORY.append(event) | |
with open('saas-log-%s.log' % START_TIME, 'w') as log_file: | |
json.dump(HISTORY, log_file) | |
@app.on_message(Filters.text & Filters.private) | |
def on_message(client, message): | |
logger = logging.getLogger('Message Listener') | |
if message.from_user.is_self: | |
return | |
if message.text.startswith('\'spam '): | |
spam_wrapper(client, message) | |
else: | |
logger.info('>>> Respond with help message') | |
client.send_message( | |
message.chat.id, HELP, | |
reply_to_message_id=message.message_id, | |
parse_mode='HTML' | |
) | |
if __name__ == '__main__': | |
print(LICENSE_NOTICE) | |
logging.basicConfig(level=logging.INFO) | |
logging.getLogger("pyrogram").setLevel(logging.WARNING) | |
logger = logging.getLogger(__name__) | |
if os.path.exists('saas-log-%s.log' % START_TIME): | |
logger.error('>>> Log file already exists.') | |
exit(1) | |
app.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment