Created
April 27, 2021 22:17
-
-
Save AaronGhent/99b76b98460b457d5b0be4d7c7a5ea8f to your computer and use it in GitHub Desktop.
q3playerdb - Quake 3 Player Database
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
# >><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><< # | |
# q3playerdb - Quake 3 Player Database | |
# Author: Aaron Ghent | |
# >><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><< # | |
import socket | |
import select | |
import sqlite3 | |
import re | |
import os | |
import sys | |
import getopt | |
import time | |
SERVER = '127.0.0.1:27960' | |
RCONPASS = 'rconpass' | |
class Database: | |
player_table_name = 'players' | |
player_table = '''CREATE TABLE IF NOT EXISTS %s( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
alias TEXT NOT NULL, | |
guid TEXT NOT NULL, | |
ip TEXT NOT NULL | |
);''' % player_table_name | |
def __init__(self, database='%s.db' % __file__[:-3]): | |
self.conn = sqlite3.connect(database, isolation_level=None) | |
self.conn.row_factory = sqlite3.Row | |
self.conn.execute(self.player_table) | |
def query(self, query=''): | |
return self.conn.execute(query) | |
def query_guid(self, guid): | |
return self.query('SELECT * FROM %s WHERE guid = \'%s\';' % | |
(self.player_table_name, guid)) | |
def query_guid8(self, guid): | |
""" | |
query the last 8 chars of the guid | |
""" | |
return self.query('SELECT * FROM %s WHERE guid LIKE \'%s%s\';' % | |
(self.player_table_name, '_' * 24, guid)) | |
def query_alias(self, player): | |
player = player.replace('"', '') | |
return self.query('SELECT * FROM %s WHERE alias LIKE "%%%s%%";'% | |
(self.player_table_name, player)) | |
def query_ip(self, ip): | |
return self.query('SELECT * FROM %s WHERE ip = \'%s\';' % | |
(self.player_table_name, ip)) | |
def add_player(self, alias, guid, ip): | |
alias = alias.replace('"', '') | |
result = self.query('''SELECT COUNT(*) FROM %s | |
WHERE guid = "%s" | |
AND alias = "%s" | |
AND ip = "%s" | |
''' % (self.player_table_name, guid, alias, ip)) | |
result = result.fetchone() | |
found = int(result[0]) | |
if found < 1: | |
self.query('''INSERT INTO %s(alias, guid, ip) | |
VALUES("%s","%s","%s");''' % | |
(self.player_table_name, alias, guid, ip)) | |
return True | |
return False | |
class Quake3: | |
packet_prefix = '\xff' * 4 | |
def __init__(self, server, rcon_password=''): | |
self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
self.set_server(server) | |
self.set_rcon_password(rcon_password) | |
def set_server(self, server): | |
try: | |
self.address, self.port = server.split(':') | |
except: | |
raise Exception('Server address must be in the format of \ | |
"address:port"') | |
self.port = int(self.port) | |
self.s.connect((self.address, self.port)) | |
def set_rcon_password(self, rcon_password): | |
self.rcon_password = rcon_password | |
def send_packet(self, data): | |
self.s.send('%s%s\n' % (self.packet_prefix, data)) | |
def recv(self, timeout=3): | |
self.s.settimeout(timeout) | |
rd, wr, er = select.select([self.s], [], [self.s], timeout) | |
if not len(rd): | |
raise Exception('No readable socket') | |
data = '' | |
while len(rd): | |
d = str(self.s.recv(4096)) | |
if d: | |
data += d | |
rd, wr, er = select.select([self.s], [], [self.s], timeout) | |
return data.strip() | |
def eval_packet(self, data): | |
if data.find(self.packet_prefix) != 0: | |
return False | |
return True | |
def send_cmd(self, cmd, timeout=1, retries=3): | |
while retries: | |
self.send_packet(cmd) | |
try: | |
data = self.recv(timeout) | |
except: | |
data = None | |
if data and self.eval_packet(data): | |
return data | |
retries -= 1 | |
raise Exception('Server response timed out') | |
def send_rcon_cmd(self, cmd): | |
r = self.send_cmd('rcon "%s" %s' % (self.rcon_password, cmd)) | |
if r[1] == 'No rconpassword set on the server.\n' or r[1] == \ | |
'Bad rconpassword.\n': | |
raise Exception(r[1][:-1]) | |
return r | |
def parse_response(response): | |
""" | |
Returns a list of dictionaries like | |
{ 'ip' : '127.0.0.1', 'guid' : '32char_guid', player : 'pants ' } | |
""" | |
regex_guid = '(?P<guid>[0-9A-F]{32})\(\w+\)' | |
regex_ip = '(?P<ip>(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}' + \ | |
'(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)):\d+' | |
regex_alias = '"(?P<alias>.*)"' | |
regex_line = '^\^3PunkBuster Server: \d{1,2} ?%s %s .* %s$' % ( | |
regex_guid, regex_ip, regex_alias) | |
regex = re.compile(regex_line, re.IGNORECASE | re.MULTILINE) | |
groups = regex.groupindex | |
player_matches = regex.findall(response) | |
players = list() | |
for player in player_matches: | |
players.append({ | |
'alias' : player[groups['alias']-1], | |
'guid' : player[groups['guid']-1], | |
'ip' : player[groups['ip']-1] | |
}) | |
return players | |
def parse_cpma_colours(str): | |
if not str: | |
return str | |
parsed = '' | |
bc = False | |
count = 0 | |
for ch in str: | |
if ch == '^': | |
bc = True | |
if count > 1: | |
count = 0 | |
parsed += ch | |
count += 1 | |
elif ch == '@' or ch == '!': | |
if not bc: | |
parsed += ch | |
count = 0 | |
else: | |
if not bc: | |
parsed += ch | |
else: | |
bc = False | |
count = 0 | |
return parsed | |
def add_players(db, players): | |
count = 0 | |
for player in players: | |
added = db.add_player(parse_cpma_colours(player['alias']), | |
player['guid'], player['ip']) | |
if added: | |
count += 1 | |
return count | |
def output(db_cursor): | |
print '[ GUID ]\t\t\t\t[ IP ]\t\t[ player ]' | |
for row in db_cursor: | |
print '%s\t%s\t%s' % (row['guid'], row['ip'], row['alias']) | |
def update(): | |
q = Quake3(SERVER, RCONPASS) | |
db = Database() | |
response = q.send_rcon_cmd('pb_sv_plist') | |
players = parse_response(response) | |
added = add_players(db, players) | |
print '%d player(s) captured' % len(players) | |
print '%d player(s) added to the database' % added | |
def search(type, str): | |
db = Database() | |
if type == 'alias': | |
output(db.query_alias(str)) | |
elif type == 'guid': | |
output(db.query_guid(str)) | |
elif type == 'guid8': | |
output(db.query_guid8(str)) | |
elif type == 'ip': | |
output(db.query_ip(str)) | |
else: | |
print 'type doesnt exist' | |
def daemonize(arg): | |
print 'daemonizing' | |
if os.name != 'nt': | |
pid = os.fork() | |
if pid > 0: | |
sys.exit(0) | |
else: | |
print 'cant daemonize on windows just gonna loop' | |
while True: | |
update() | |
time.sleep(arg) | |
def usage(): | |
print sys.argv[0] | |
print '>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<' | |
print '-h or --help for this screen' | |
print '-u or --update for updating the database with new player info' | |
print '-d or --daemonize [time] for updating every interval' | |
print '-s or --search [type] [string to search]' | |
print '\ttypes: guid guid8 alias ip' | |
if __name__ == '__main__': | |
try: | |
opts, args = getopt.getopt(sys.argv[1:], 'huds:', ['help','update', | |
'daemonize', 'search']) | |
except getopt.GetoptError: | |
usage() | |
sys.exit(2) | |
for opt, arg in opts: | |
if opt in ('-h', '--help'): | |
usage() | |
sys.exit() | |
elif opt in ('-u', '--update'): | |
update() | |
sys.exit() | |
elif opt in ('-d', '--daemonize'): | |
if len(args) == 0: | |
daemonize(300) | |
else: | |
daemonize(int(args[0])) | |
sys.exit() | |
elif opt in ('-s', '--search'): | |
if len(args) > 0 and len(args) < 2: | |
search(arg, args[0]) | |
sys.exit() | |
usage() | |
sys.exit(1) | |
# >><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><< # | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment