Skip to content

Instantly share code, notes, and snippets.

@Konano
Created September 19, 2022 16:08
Show Gist options
  • Save Konano/204ef1e442b3f239663985696fbcadf9 to your computer and use it in GitHub Desktop.
Save Konano/204ef1e442b3f239663985696fbcadf9 to your computer and use it in GitHub Desktop.
Websocket-based notificatons push service for GZCTF platform to Discord.
import time
import _thread as thread
from websocket import WebSocketApp
import websocket
import requests
import json
from log import logger
GAME_ID = 1
COOKIE = 'GZCTF_Token='
CHANNEL_ANNOUNCEMENTS = 'https://discord.com/api/webhooks/'
CHANNEL_SOLVES = 'https://discord.com/api/webhooks/'
RUNNING = True
def sendMsg(content, url):
requests.post(url, data={'content': content}, timeout=(15, 30))
def deal(msg, emoji):
return f':{emoji}: ' + msg.replace('⌈', '`').replace('⌋', '`')
def getToken():
while True:
try:
r = requests.post(
f'https://website/hub/user/negotiate?game={GAME_ID}&negotiateVersion=1', headers={'cookie': COOKIE}, timeout=(15, 30))
return r.json()['connectionToken']
except Exception as e:
logger.error('getTokenerror: %s' % e)
time.sleep(60)
class WSSClient(object):
def __init__(self, connectionToken):
super(WSSClient, self).__init__()
self.url = f'wss://website/hub/user?game={GAME_ID}&id={connectionToken}'
self.ws = None
self.status = False
def on_message(self, message):
# logger.debug('####### on_message #######')
if message.startswith('{"type":6}'):
if not self.status:
logger.info('####### on_connect #######')
self.status = True
self.ws.send(message)
return
if message.startswith('{}'):
return
logger.info('message: %s' % message)
status = json.loads(message[:-1])
for arg in status['arguments']:
if arg['type'] == 'NewChallenge':
sendMsg(deal(arg['content'], 'new'), CHANNEL_ANNOUNCEMENTS)
if arg['type'] == 'NewHint':
sendMsg(deal(arg['content'], 'bulb'), CHANNEL_ANNOUNCEMENTS)
if arg['type'] == 'FirstBlood':
sendMsg(deal(arg['content'], 'first_place'), CHANNEL_SOLVES)
if arg['type'] == 'SecondBlood':
sendMsg(deal(arg['content'], 'second_place'), CHANNEL_SOLVES)
if arg['type'] == 'ThirdBlood':
sendMsg(deal(arg['content'], 'third_place'), CHANNEL_SOLVES)
def on_error(self, error):
if isinstance(error, KeyboardInterrupt):
global RUNNING
RUNNING = False
else:
logger.error('####### on_error #######')
logger.error('%s: %s', error.__class__.__name__, error)
def on_close(self):
logger.warning('####### on_close #######')
def on_open(self):
logger.info('####### on_open #######')
thread.start_new_thread(self.run, ())
def run(self, *args):
time.sleep(1)
self.ws.send('{"protocol":"json","version":1}\x1e')
def start(self):
# websocket.enableTrace(True) # 开启运行状态追踪。debug 的时候最好打开他,便于追踪定位问题。
self.ws = WebSocketApp(self.url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
cookie=COOKIE)
self.ws.run_forever()
if __name__ == '__main__':
while True:
WSSClient(getToken()).start()
if not RUNNING:
break
time.sleep(60)
import sys
import logging
import colorlog
# BASIC_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
COLOR_FORMAT = '%(log_color)s%(asctime)s - %(levelname)s - %(message)s'
DATE_FORMAT = None
# basic_formatter = logging.Formatter(BASIC_FORMAT, DATE_FORMAT)
color_formatter = colorlog.ColoredFormatter(COLOR_FORMAT, DATE_FORMAT)
class MaxFilter:
def __init__(self, max_level):
self.max_level = max_level
def filter(self, record):
if record.levelno <= self.max_level:
return True
chlr = logging.StreamHandler(stream=sys.stdout)
chlr.setFormatter(color_formatter)
chlr.setLevel('INFO')
chlr.addFilter(MaxFilter(logging.INFO))
ehlr = logging.StreamHandler(stream=sys.stderr)
ehlr.setFormatter(color_formatter)
ehlr.setLevel('WARNING')
logger = logging.getLogger(__name__)
logger.setLevel('DEBUG')
logger.addHandler(chlr)
logger.addHandler(ehlr)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment