Created
September 19, 2022 16:08
-
-
Save Konano/204ef1e442b3f239663985696fbcadf9 to your computer and use it in GitHub Desktop.
Websocket-based notificatons push service for GZCTF platform to Discord.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import _thread as thread | |
from websocket import WebSocketApp | |
import websocket | |
import requests | |
import json | |
from log import logger | |
GAME_ID = 1 | |
COOKIE = 'GZCTF_Token=' | |
CHANNEL_ANNOUNCEMENTS = 'https://discord.com/api/webhooks/' | |
CHANNEL_SOLVES = 'https://discord.com/api/webhooks/' | |
RUNNING = True | |
def sendMsg(content, url): | |
requests.post(url, data={'content': content}, timeout=(15, 30)) | |
def deal(msg, emoji): | |
return f':{emoji}: ' + msg.replace('⌈', '`').replace('⌋', '`') | |
def getToken(): | |
while True: | |
try: | |
r = requests.post( | |
f'https://website/hub/user/negotiate?game={GAME_ID}&negotiateVersion=1', headers={'cookie': COOKIE}, timeout=(15, 30)) | |
return r.json()['connectionToken'] | |
except Exception as e: | |
logger.error('getTokenerror: %s' % e) | |
time.sleep(60) | |
class WSSClient(object): | |
def __init__(self, connectionToken): | |
super(WSSClient, self).__init__() | |
self.url = f'wss://website/hub/user?game={GAME_ID}&id={connectionToken}' | |
self.ws = None | |
self.status = False | |
def on_message(self, message): | |
# logger.debug('####### on_message #######') | |
if message.startswith('{"type":6}'): | |
if not self.status: | |
logger.info('####### on_connect #######') | |
self.status = True | |
self.ws.send(message) | |
return | |
if message.startswith('{}'): | |
return | |
logger.info('message: %s' % message) | |
status = json.loads(message[:-1]) | |
for arg in status['arguments']: | |
if arg['type'] == 'NewChallenge': | |
sendMsg(deal(arg['content'], 'new'), CHANNEL_ANNOUNCEMENTS) | |
if arg['type'] == 'NewHint': | |
sendMsg(deal(arg['content'], 'bulb'), CHANNEL_ANNOUNCEMENTS) | |
if arg['type'] == 'FirstBlood': | |
sendMsg(deal(arg['content'], 'first_place'), CHANNEL_SOLVES) | |
if arg['type'] == 'SecondBlood': | |
sendMsg(deal(arg['content'], 'second_place'), CHANNEL_SOLVES) | |
if arg['type'] == 'ThirdBlood': | |
sendMsg(deal(arg['content'], 'third_place'), CHANNEL_SOLVES) | |
def on_error(self, error): | |
if isinstance(error, KeyboardInterrupt): | |
global RUNNING | |
RUNNING = False | |
else: | |
logger.error('####### on_error #######') | |
logger.error('%s: %s', error.__class__.__name__, error) | |
def on_close(self): | |
logger.warning('####### on_close #######') | |
def on_open(self): | |
logger.info('####### on_open #######') | |
thread.start_new_thread(self.run, ()) | |
def run(self, *args): | |
time.sleep(1) | |
self.ws.send('{"protocol":"json","version":1}\x1e') | |
def start(self): | |
# websocket.enableTrace(True) # 开启运行状态追踪。debug 的时候最好打开他,便于追踪定位问题。 | |
self.ws = WebSocketApp(self.url, | |
on_open=self.on_open, | |
on_message=self.on_message, | |
on_error=self.on_error, | |
on_close=self.on_close, | |
cookie=COOKIE) | |
self.ws.run_forever() | |
if __name__ == '__main__': | |
while True: | |
WSSClient(getToken()).start() | |
if not RUNNING: | |
break | |
time.sleep(60) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import logging | |
import colorlog | |
# BASIC_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' | |
COLOR_FORMAT = '%(log_color)s%(asctime)s - %(levelname)s - %(message)s' | |
DATE_FORMAT = None | |
# basic_formatter = logging.Formatter(BASIC_FORMAT, DATE_FORMAT) | |
color_formatter = colorlog.ColoredFormatter(COLOR_FORMAT, DATE_FORMAT) | |
class MaxFilter: | |
def __init__(self, max_level): | |
self.max_level = max_level | |
def filter(self, record): | |
if record.levelno <= self.max_level: | |
return True | |
chlr = logging.StreamHandler(stream=sys.stdout) | |
chlr.setFormatter(color_formatter) | |
chlr.setLevel('INFO') | |
chlr.addFilter(MaxFilter(logging.INFO)) | |
ehlr = logging.StreamHandler(stream=sys.stderr) | |
ehlr.setFormatter(color_formatter) | |
ehlr.setLevel('WARNING') | |
logger = logging.getLogger(__name__) | |
logger.setLevel('DEBUG') | |
logger.addHandler(chlr) | |
logger.addHandler(ehlr) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment