Created
December 13, 2018 11:00
-
-
Save sephii/ed45f1049c1cf51458cd5aefdc3de50a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# cstrike plugin for python-rtmbot (https://github.com/slackapi/python-rtmbot) | |
import glob | |
import os | |
import re | |
import random | |
import socket | |
import struct | |
from valve.source.a2s import ServerQuerier | |
SERVER_INFO = ('<example.com>', 27015) | |
RCON_PASSWORD = '<Your rcon password>' | |
SAY_PLAYERS_INTERVAL = 60 | |
TARGET_CHANNEL_ID = '<C048TFP2F>' | |
CSTRIKE_DIR = '/home/steam/cstrike' | |
sq = ServerQuerier(SERVER_INFO) | |
crontable = [ | |
[SAY_PLAYERS_INTERVAL, 'say_players'], | |
] | |
outputs = [] | |
def say_players(): | |
players = sq.players() | |
if players['player_count'] == 0: | |
return | |
oldest_player = max([player['duration'] for player in players['players']]) | |
if oldest_player < SAY_PLAYERS_INTERVAL: | |
msg = "There are %d players on the server. Go go go!" % players['player_count'] | |
outputs.append([TARGET_CHANNEL_ID, msg]) | |
def changelevel(data, cmdargs): | |
resp = rcon(SERVER_INFO, RCON_PASSWORD, 'changelevel %s' % cmdargs) | |
if 'changelevel failed' in resp: | |
msg = "Map %s not found on the server" % cmdargs | |
else: | |
msg = "Map changed to %s" % cmdargs | |
outputs.append([data['channel'], msg]) | |
def maps(data, cmdargs): | |
maps_list = glob.glob( | |
os.path.join(CSTRIKE_DIR, 'cstrike/maps/*.bsp') | |
) | |
maps_list = [ | |
os.path.splitext(os.path.basename(m))[0] for m in maps_list | |
] | |
outputs.append([data['channel'], '\n'.join(sorted(maps_list))]) | |
def process_message(data): | |
commands_mapping = { | |
'players': show_players, | |
'map': changelevel, | |
'maps': maps, | |
'ranking': show_ranking, | |
'slap': slap, | |
} | |
if 'text' not in data: | |
return | |
if not data['text'].startswith('!'): | |
return | |
if ' ' in data['text']: | |
text = data['text'].split(' ', 1) | |
command, cmdargs = text | |
else: | |
command = data['text'] | |
cmdargs = '' | |
command = command.lstrip('!') | |
if command in commands_mapping: | |
commands_mapping[command](data, cmdargs) | |
def show_players(data, cmdargs): | |
outputs.append([data['channel'], players()]) | |
def players(): | |
players = sq.players() | |
if players['player_count'] == 0: | |
return "The server is empty :(" | |
else: | |
ranking = sorted(players['players'], key=lambda p: p['score'], | |
reverse=True) | |
buff = u"There are %s players on the server:\n\n" % players['player_count'] | |
for pos, player in enumerate(ranking, 1): | |
buff += u"{pos} {player} ({score})\n".format( | |
pos=pos, player=player['name'], score=player['score'] | |
) | |
return buff.encode('utf-8') | |
def rcon(server, password, cmd): | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
sock.settimeout(0.5) | |
sock.connect(server) | |
sock.send("\xFF\xFF\xFF\xFFchallenge rcon\n\0") | |
data = "" | |
data = sock.recv(1024) | |
obj = re.search("([0-9]+)", data, re.M) | |
chal = obj.group() | |
sock.send("\xFF\xFF\xFF\xFFrcon {challenge} {rcon} {cmd}".format( | |
challenge=chal, rcon=password, cmd=cmd | |
)) | |
output = "" | |
while True: | |
try: | |
data = sock.recv(1024) | |
except socket.timeout: | |
break | |
if not data: | |
break | |
output += data[5:] | |
sock.close() | |
return output | |
def show_ranking(data, cmdargs): | |
rows = [['#', 'Name', 'Kills', 'Deaths', 'HS', 'Eff.', 'Acc.', 'HS ratio']] | |
ranking = get_ranking() | |
for i, player in enumerate(ranking, 1): | |
try: | |
kills_deaths = int(player['kills'] / float(player['kills'] + player['deaths']) * 100) | |
except ZeroDivisionError: | |
kills_deaths = 0 | |
try: | |
hs_ratio = int(player['hs'] / float(player['kills']) * 100) | |
except ZeroDivisionError: | |
hs_ratio = 0 | |
try: | |
accuracy = int(player['hits'] / float(player['shots']) * 100) | |
except ZeroDivisionError: | |
accuracy = 0 | |
rows.append([ | |
i, player['name'], player['kills'], player['deaths'], player['hs'], | |
'%d%%' % kills_deaths, | |
'%d%%' % accuracy, | |
'%d%%' % hs_ratio, | |
]) | |
longest_content = [ | |
max([len(str(row[col])) for row in rows]) | |
for col, _ in enumerate(rows[0]) | |
] | |
buffer = '' | |
for row in rows: | |
for i, col in enumerate(row): | |
size = max(longest_content[i] + 1, 5) | |
if i == 1: | |
buffer += str(col).ljust(size) | |
else: | |
buffer += str(col).rjust(size) | |
buffer += ' ' | |
buffer += '\n' | |
buffer = buffer.replace('```', '[LOSER]') | |
buffer = '```\n' + buffer + '```' | |
outputs.append([data['channel'], buffer]) | |
def slap(data, cmdargs): | |
players = sq.players() | |
players_names = [p['name'] for p in players['players']] | |
if players_names: | |
player_to_slap = random.choice(players_names) | |
rcon(SERVER_INFO, RCON_PASSWORD, 'amx_slap "%s"' % player_to_slap) | |
else: | |
outputs.append([data['channel'], u"No player to slap :("]) | |
def get_ranking(): | |
def as_sort_value(p): | |
if p['kills'] + p['deaths'] == 0: | |
return 0 | |
return p['kills'] / float(p['kills'] + p['deaths']) | |
ranking = [] | |
with open('/home/steam/cstrike/cstrike/addons/amxmodx/data/csstats.dat', 'rb') as f: | |
byte = f.read(2) | |
while byte != "": | |
player = {} | |
name_len = read_shortint(f) | |
if name_len == 0: | |
break | |
player['name'] = read_str(f, name_len) | |
unique_len = read_shortint(f) | |
player['steamid'] = read_str(f, unique_len) | |
for info in ['tks', 'damage', 'deaths', 'kills', 'shots', 'hits', 'hs', | |
'defusions', 'defused', 'plants', 'explosions']: | |
player[info] = read_int(f) | |
player['body_hits'] = [] | |
for i in range(1, 10): | |
player['body_hits'].append(struct.unpack('i', f.read(4))) | |
ranking.append(player) | |
ranking = sorted(ranking, key=as_sort_value, reverse=True) | |
return ranking | |
def b_read(f, size, var_type): | |
return struct.unpack(var_type, f.read(size)) | |
def read_int(f): | |
return b_read(f, 4, 'i')[0] | |
def read_shortint(f): | |
return b_read(f, 2, 'h')[0] | |
def read_str(f, length): | |
return ''.join(b_read(f, length, 's' * length)).rstrip('\x00') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment