Created
June 15, 2024 17:32
-
-
Save abc1763613206/a40c8ae6b9e8e3cb9c30a715aeb6d05c to your computer and use it in GitHub Desktop.
GZCTF 通知推送至 QQ 群脚本,适配最新推送协议,改编自 @NanoApe
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Credits to @NanoApe | |
# websocket-client==0.57.0 更高版本会出问题! | |
# Original Source: https://gist.github.com/Konano/204ef1e442b3f239663985696fbcadf9 | |
import time | |
import _thread as thread | |
from websocket import WebSocketApp | |
import websocket | |
import requests | |
import json | |
import logging | |
import json | |
GAME_ID = 1 | |
COOKIE = "GZCTF_Token=" | |
RUNNING = True | |
PUSH_ENDPOINT = "" | |
PUSH_TOKEN = "" | |
GROUP_GENERAL = | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(levelname)s - %(message)s", | |
datefmt="%Y-%m-%d %H:%M:%S", | |
) | |
def sendMsg(content, group): | |
try: | |
r = requests.post( | |
PUSH_ENDPOINT, | |
json={ | |
"token": PUSH_TOKEN, | |
"send_to_group": group, | |
"content": content, | |
}, | |
timeout=(15, 30), | |
) | |
if r.status_code == 200: | |
logging.info("sendMsg: %s" % content) | |
else: | |
logging.error("sendMsgerror: %s" % r.text) | |
except Exception as e: | |
logging.error("sendMsgerror: %s" % e) | |
""" | |
def deal(msg, emoji): | |
return f":{emoji}: " + msg.replace("⌈", "`").replace("⌋", "`") | |
""" | |
def getToken(): | |
while True: | |
try: | |
r = requests.post( | |
f"https://website/hub/user/negotiate?game={GAME_ID}&negotiateVersion=1", | |
headers={"cookie": COOKIE}, | |
timeout=(15, 30), | |
) | |
return r.json()["connectionToken"] | |
except Exception as e: | |
logging.error("getTokenerror: %s" % e) | |
time.sleep(60) | |
class WSSClient(object): | |
def __init__(self, connectionToken): | |
super(WSSClient, self).__init__() | |
self.url = f"wss://website/hub/user?game={GAME_ID}&id={connectionToken}" | |
self.ws = None | |
self.status = False | |
def on_message(self, message): | |
# logging.debug('####### on_message #######') | |
if message.startswith('{"type":6}'): | |
if not self.status: | |
logging.info("####### on_connect #######") | |
self.status = True | |
self.ws.send(message) | |
return | |
if message.startswith("{}"): | |
return | |
logging.info("message: %s" % message) | |
status = json.loads(message[:-1]) | |
for arg in status["arguments"]: | |
if arg["type"] == "Normal": | |
sendMsg( | |
"💬 新公告发布:" + "".join(arg["values"]), | |
GROUP_GENERAL, | |
) | |
if arg["type"] == "NewChallenge": | |
sendMsg( | |
"🆕 新题目发布:" + "".join(arg["values"]), | |
GROUP_GENERAL, | |
) | |
if arg["type"] == "NewHint": | |
sendMsg( | |
"💡 新提示发布:" + "".join(arg["values"]), | |
GROUP_GENERAL, | |
) | |
if arg["type"] == "FirstBlood": | |
sendMsg( | |
"🥇 恭喜 [" | |
+ arg["values"][0] | |
+ "] 获得题目 [" | |
+ arg["values"][1] | |
+ "] 的一血!", | |
GROUP_GENERAL, | |
) | |
if arg["type"] == "SecondBlood": | |
sendMsg( | |
"🥈 恭喜 [" | |
+ arg["values"][0] | |
+ "] 获得题目 [" | |
+ arg["values"][1] | |
+ "] 的二血!", | |
GROUP_GENERAL, | |
) | |
if arg["type"] == "ThirdBlood": | |
sendMsg( | |
"🥉 恭喜 [" | |
+ arg["values"][0] | |
+ "] 获得题目 [" | |
+ arg["values"][1] | |
+ "] 的三血!", | |
GROUP_GENERAL, | |
) | |
def on_error(self, error): | |
if isinstance(error, KeyboardInterrupt): | |
global RUNNING | |
RUNNING = False | |
else: | |
logging.error("####### on_error #######") | |
logging.error("%s: %s", error.__class__.__name__, error) | |
def on_close(self): | |
logging.warning("####### on_close #######") | |
def on_open(self): | |
logging.info("####### on_open #######") | |
thread.start_new_thread(self.run, ()) | |
def run(self, *args): | |
time.sleep(1) | |
self.ws.send('{"protocol":"json","version":1}\x1e') | |
def start(self): | |
# websocket.enableTrace(True) # 开启运行状态追踪。debug 的时候最好打开他,便于追踪定位问题。 | |
self.ws = WebSocketApp( | |
self.url, | |
on_open=self.on_open, | |
on_message=self.on_message, | |
on_error=self.on_error, | |
on_close=self.on_close, | |
cookie=COOKIE, | |
) | |
self.ws.run_forever() | |
if __name__ == "__main__": | |
while True: | |
WSSClient(getToken()).start() | |
if not RUNNING: | |
break | |
time.sleep(60) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
sendMsg 部分请根据自己机器人情况定义。