-
-
Save sundevilyang/139f158f529a21e4a24844e892ebbb8c to your computer and use it in GitHub Desktop.
响应好友请求 / 自动聊天 / 限制频率 / 邀请入群 / 远程群管理 / 新人欢迎消息 / 关键词问答 / 发心跳 / 远程命令 / 远程执行代码
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# coding: utf-8 | |
import datetime | |
import logging | |
import os | |
import re | |
import subprocess | |
import time | |
from functools import wraps | |
from pprint import pformat | |
from threading import Thread | |
""" | |
psutil 模块用于监控进程状态,例如内存占用情况,请自行安装: | |
pip3 install -U psutil | |
""" | |
import psutil | |
from wxpy import * | |
from wxpy.utils.misc import get_text_without_at_bot | |
logging.basicConfig(level=logging.INFO) | |
# 下面的 console_qr 参数,请自行按需调整 | |
bot = Bot('bot.pkl', console_qr=-2) | |
# 防止登错账号 | |
if 'wxpy' not in bot.self.name: | |
raise ValueError('Wrong User!') | |
# 定义远程管理员 (用于远程管理),使用备注名更安全 | |
remote_admin = ensure_one(bot.friends().search(remark_name='youfou')) | |
# 使用 wxid 找到需要管理的微信群 | |
bot.groups(True) | |
group_ids = ( | |
# wxpy 交流群 🐰 | |
'6411313640@chatroom', | |
# wxpy 交流群 🐱 | |
'6788356306@chatroom', | |
# wxpy 交流群 🐨 | |
'6737430866@chatroom', | |
) | |
wxpy_groups = list() | |
for wxid in group_ids: | |
g = bot.groups().search(wxid=wxid)[0] | |
wxpy_groups.append(g) | |
# 初始化聊天机器人 | |
tuling = Tuling() | |
# 自动回答关键词 | |
kw_replies = { | |
'wxpy 项目主页:\ngithub.com/youfou/wxpy': ( | |
'项目', '主页', '官网', '网站', 'github', '地址', 'repo', '版本' | |
), | |
'wxpy 在线文档:\nwxpy.readthedocs.io': ( | |
'请问', '文档', '帮助', '怎么', '如何', '请教', '安装', '说明' | |
), | |
'必看: 常见问题 FAQ:\nwxpy.readthedocs.io/faq.html': ( | |
'faq', '常见', '问题', '问答', '什么' | |
) | |
} | |
# 新人入群的欢迎语 | |
welcome_text = '''🎉 欢迎 @{} 的加入! | |
😃 请勿在本群使用机器人 | |
📖 提问前请看 t.cn/R6VkJDy''' | |
# 新人入群通知的匹配正则 | |
rp_new_member_name = ( | |
re.compile(r'^"(.+)"通过'), | |
re.compile(r'邀请"(.+)"加入'), | |
) | |
# 远程踢人命令: @<机器人> 移出 @<需要被移出的人> | |
rp_kick = re.compile(r'^@.+移出\s*@(.+?)(?:\u2005?\s*$)') | |
def update_groups(): | |
remote_admin.send('updating groups...') | |
for _group in wxpy_groups: | |
_group.update_group() | |
remote_admin.send('{}: {}'.format(_group.name, len(_group))) | |
return True | |
process = psutil.Process() | |
def status_text(): | |
uptime = datetime.datetime.now() - datetime.datetime.fromtimestamp(process.create_time()) | |
memory_usage = process.memory_info().rss | |
return '{uptime}, {memory}'.format( | |
uptime=str(uptime).split('.')[0], | |
memory='{:.2f} MB'.format(memory_usage / 1024 ** 2) | |
) | |
def send_status_text(): | |
return remote_admin.send(status_text()) | |
# 定时报告进程状态 | |
def heartbeat(): | |
while bot.alive: | |
time.sleep(600) | |
# noinspection PyBroadException | |
try: | |
send_status_text() | |
except: | |
logger.exception('failed to report heartbeat:') | |
heartbeat_thread = Thread(target=heartbeat, daemon=True, name='heartbeat') | |
heartbeat_thread.start() | |
def remote_eval(source): | |
try: | |
ret = eval(source, globals()) | |
except (SyntaxError, NameError): | |
return | |
except Exception as e: | |
ret = e | |
logger.info('remote eval executed:\n{}'.format(source)) | |
remote_admin.send(pformat(ret)) | |
return True | |
def remote_shell(cmd): | |
if cmd.startswith('!'): | |
cmd = cmd[1:] | |
logger.info('executing remote shell cmd:\n{}'.format(cmd)) | |
r = subprocess.run( | |
cmd, shell=True, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, | |
universal_newlines=True | |
) | |
if r.stdout: | |
remote_admin.send(r.stdout) | |
else: | |
remote_admin.send('[OK]') | |
return True | |
def restart(): | |
remote_admin.send('restarting bot...') | |
bot.dump_login_status() | |
os.execv(sys.executable, [sys.executable] + sys.argv) | |
# 远程命令 (单独发给机器人的消息) | |
remote_orders = { | |
'groups': update_groups, | |
'status': send_status_text, | |
'restart': restart | |
} | |
# 若消息文本为为远程命令,则执行对应函数 | |
# 若消息文本以 ! 开头,则作为 shell 命令执行 | |
# 若不满足以上,则尝试直接将 msg.text 作为 Python 代码执行 | |
# 若有执行以上任何内容,则返回 True | |
def exec_remote_order(msg): | |
if msg.sender == remote_admin: | |
order = remote_orders.get(msg.text.lower().strip()) | |
if order: | |
logger.info('executing remote order: {}'.format(order.__name__)) | |
order() | |
return True | |
elif remote_shell(msg.text): | |
return True | |
elif remote_eval(msg.text): | |
return True | |
def reply_by_keyword(msg): | |
for reply, keywords in kw_replies.items(): | |
for kw in keywords: | |
if kw in msg.text.lower(): | |
logger.info('reply by keyword: \n{}: "{}"\nreplied: "{}"'.format( | |
(msg.member or msg.chat).name, msg.text, reply)) | |
msg.reply(reply) | |
return reply | |
# 验证入群口令 | |
def valid(msg): | |
return 'wxpy' in msg.text.lower() | |
# 自动选择未满的群 | |
def get_group(): | |
wxpy_groups.sort(key=len, reverse=True) | |
for _group in wxpy_groups: | |
if len(_group) < 495: | |
return _group | |
else: | |
logger.warning('群都满啦!') | |
return wxpy_groups[-1] | |
# 邀请入群 | |
def invite(user): | |
joined = list() | |
for group in wxpy_groups: | |
if user in group: | |
joined.append(group) | |
if joined: | |
joined_nick_names = '\n'.join(map(lambda x: x.nick_name, joined)) | |
logger.info('{} is already in\n{}'.format(user, joined_nick_names)) | |
user.send('你已加入了\n{}'.format(joined_nick_names)) | |
else: | |
group = get_group() | |
user.send('验证通过 [嘿哈]') | |
group.add_members(user, use_invitation=True) | |
# 限制频率: 指定周期内超过消息条数,直接回复 "🙊" | |
def freq_limit(period_secs=10, limit_msgs=3): | |
def decorator(func): | |
@wraps(func) | |
def wrapped(msg): | |
now = datetime.datetime.now() | |
period = datetime.timedelta(seconds=period_secs) | |
recent_received = 0 | |
for m in msg.bot.messages[::-1]: | |
if m.sender == msg.sender: | |
if now - m.create_time > period: | |
break | |
recent_received += 1 | |
if recent_received > limit_msgs: | |
if not isinstance(msg.chat, Group) or msg.is_at: | |
return '🙊' | |
return func(msg) | |
return wrapped | |
return decorator | |
def get_new_member_name(msg): | |
# itchat 1.2.32 版本未格式化群中的 Note 消息 | |
from itchat.utils import msg_formatter | |
msg_formatter(msg.raw, 'Text') | |
for rp in rp_new_member_name: | |
match = rp.search(msg.raw['Text']) | |
if match: | |
return match.group(1) | |
def remote_kick(msg): | |
if msg.is_at and msg.type is TEXT: | |
match = rp_kick.search(msg.text) | |
if match: | |
if msg.member != remote_admin: | |
raise ValueError('Wrong admin: {}'.format(msg.member)) | |
name_to_kick = match.group(1) | |
member_to_kick = ensure_one(list(filter( | |
lambda x: x.name == name_to_kick, msg.chat))) | |
if member_to_kick in (bot.self, remote_admin): | |
raise ValueError('Wrong member to kick: {}'.format(member_to_kick)) | |
else: | |
member_to_kick.remove() | |
msg.chat.send('已移出 {}'.format(name_to_kick)) | |
return True | |
def semi_sync(msg, groups): | |
if msg.is_at: | |
msg.text = get_text_without_at_bot(msg) | |
if msg.text: | |
sync_message_in_groups( | |
msg, groups, suffix='↑隔壁消息↑回复请@机器人') | |
# 判断消息是否为支持回复的消息类型 | |
def supported_msg_type(msg, reply_unsupported=False): | |
supported = (TEXT,) | |
ignored = (SYSTEM, NOTE, FRIENDS) | |
fallback_replies = { | |
RECORDING: '🙉', | |
PICTURE: '🙈', | |
VIDEO: '🙈', | |
} | |
if msg.type in supported: | |
return True | |
elif msg.type not in ignored and reply_unsupported: | |
msg.reply(fallback_replies.get(msg.type, '🐒')) | |
# 响应好友请求 | |
@bot.register(msg_types=FRIENDS) | |
def new_friends(msg): | |
user = msg.card.accept() | |
if valid(msg): | |
invite(user) | |
else: | |
user.send('Hello {},你忘了填写加群口令,快回去找找口令吧'.format(user.name)) | |
# 响应好友消息,限制频率 | |
@bot.register(Friend) | |
@freq_limit() | |
def exist_friends(msg): | |
if supported_msg_type(msg, reply_unsupported=True): | |
if valid(msg): | |
invite(msg.sender) | |
return | |
elif reply_by_keyword(msg): | |
return | |
tuling.do_reply(msg) | |
@bot.register(remote_admin, msg_types=TEXT) | |
def reply_remote_admin(msg): | |
""" | |
响应远程管理员 | |
内容解析方式优先级: | |
1. 若为远程命令,则执行远程命令 (额外定义,一条命令对应一个函数) | |
2. 若消息文本以 ! 开头,则作为 shell 命令执行 | |
3. 尝试作为 Python 代码执行 (可执行大部分 Python 代码) | |
4. 若以上不满足或尝试失败,则作为普通聊天内容回复 | |
""" | |
# 上述的 1. 和 2. | |
if exec_remote_order(msg): | |
return | |
# 上述的 3. | |
return exist_friends(msg) | |
# 在其他群中回复被 @ 的消息 | |
@bot.register(Group) | |
def reply_other_group(msg): | |
if msg.chat not in wxpy_groups and msg.is_at: | |
if supported_msg_type(msg, reply_unsupported=True): | |
tuling.do_reply(msg) | |
# wxpy 群的消息处理 | |
@bot.register(wxpy_groups) | |
def wxpy_group(msg): | |
if msg.is_at and remote_kick(msg): | |
return | |
semi_sync(msg, wxpy_groups) | |
# 新人欢迎消息 | |
@bot.register(wxpy_groups, NOTE) | |
def welcome(msg): | |
name = get_new_member_name(msg) | |
if name: | |
return welcome_text.format(name) | |
def get_logger(level=logging.DEBUG, file='bot.log', mode='w'): | |
log_formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') | |
log_formatter_lite = logging.Formatter('%(name)s:%(levelname)s:%(message)s') | |
_logger = logging.getLogger() | |
for hdlr in _logger.handlers: | |
_logger.removeHandler(hdlr) | |
# 输出到文件 | |
if file: | |
file_hdlr = logging.FileHandler(file, mode) | |
file_hdlr.setFormatter(log_formatter) | |
_logger.addHandler(file_hdlr) | |
# 输出到屏幕 | |
console_hdlr = logging.StreamHandler() | |
console_hdlr.setLevel(logging.WARNING) | |
console_hdlr.setFormatter(log_formatter) | |
_logger.addHandler(console_hdlr) | |
# 输出到远程管理员微信 | |
wechat_hdlr = WeChatLoggingHandler(remote_admin) | |
wechat_hdlr.setLevel(logging.WARNING) | |
wechat_hdlr.setFormatter(log_formatter_lite) | |
_logger.addHandler(wechat_hdlr) | |
# 将未捕捉异常也发送到日志中 | |
sys.excepthook = lambda *args: logger.critical( | |
'UNCAUGHT EXCEPTION:', exc_info=args) | |
for m in 'requests', 'urllib3': | |
logging.getLogger(m).setLevel(logging.WARNING) | |
_logger.setLevel(level) | |
return _logger | |
logger = get_logger() | |
send_status_text() | |
bot.dump_login_status() | |
bot.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment