Skip to content

Instantly share code, notes, and snippets.

@AaronGhent
Created April 27, 2021 22:17
Show Gist options
  • Save AaronGhent/99b76b98460b457d5b0be4d7c7a5ea8f to your computer and use it in GitHub Desktop.
Save AaronGhent/99b76b98460b457d5b0be4d7c7a5ea8f to your computer and use it in GitHub Desktop.
q3playerdb - Quake 3 Player Database
#! /usr/bin/env python
# >><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><< #
# q3playerdb - Quake 3 Player Database
# Author: Aaron Ghent
# >><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><< #
import socket
import select
import sqlite3
import re
import os
import sys
import getopt
import time
SERVER = '127.0.0.1:27960'
RCONPASS = 'rconpass'
class Database:
player_table_name = 'players'
player_table = '''CREATE TABLE IF NOT EXISTS %s(
id INTEGER PRIMARY KEY AUTOINCREMENT,
alias TEXT NOT NULL,
guid TEXT NOT NULL,
ip TEXT NOT NULL
);''' % player_table_name
def __init__(self, database='%s.db' % __file__[:-3]):
self.conn = sqlite3.connect(database, isolation_level=None)
self.conn.row_factory = sqlite3.Row
self.conn.execute(self.player_table)
def query(self, query=''):
return self.conn.execute(query)
def query_guid(self, guid):
return self.query('SELECT * FROM %s WHERE guid = \'%s\';' %
(self.player_table_name, guid))
def query_guid8(self, guid):
"""
query the last 8 chars of the guid
"""
return self.query('SELECT * FROM %s WHERE guid LIKE \'%s%s\';' %
(self.player_table_name, '_' * 24, guid))
def query_alias(self, player):
player = player.replace('"', '')
return self.query('SELECT * FROM %s WHERE alias LIKE "%%%s%%";'%
(self.player_table_name, player))
def query_ip(self, ip):
return self.query('SELECT * FROM %s WHERE ip = \'%s\';' %
(self.player_table_name, ip))
def add_player(self, alias, guid, ip):
alias = alias.replace('"', '')
result = self.query('''SELECT COUNT(*) FROM %s
WHERE guid = "%s"
AND alias = "%s"
AND ip = "%s"
''' % (self.player_table_name, guid, alias, ip))
result = result.fetchone()
found = int(result[0])
if found < 1:
self.query('''INSERT INTO %s(alias, guid, ip)
VALUES("%s","%s","%s");''' %
(self.player_table_name, alias, guid, ip))
return True
return False
class Quake3:
packet_prefix = '\xff' * 4
def __init__(self, server, rcon_password=''):
self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.set_server(server)
self.set_rcon_password(rcon_password)
def set_server(self, server):
try:
self.address, self.port = server.split(':')
except:
raise Exception('Server address must be in the format of \
"address:port"')
self.port = int(self.port)
self.s.connect((self.address, self.port))
def set_rcon_password(self, rcon_password):
self.rcon_password = rcon_password
def send_packet(self, data):
self.s.send('%s%s\n' % (self.packet_prefix, data))
def recv(self, timeout=3):
self.s.settimeout(timeout)
rd, wr, er = select.select([self.s], [], [self.s], timeout)
if not len(rd):
raise Exception('No readable socket')
data = ''
while len(rd):
d = str(self.s.recv(4096))
if d:
data += d
rd, wr, er = select.select([self.s], [], [self.s], timeout)
return data.strip()
def eval_packet(self, data):
if data.find(self.packet_prefix) != 0:
return False
return True
def send_cmd(self, cmd, timeout=1, retries=3):
while retries:
self.send_packet(cmd)
try:
data = self.recv(timeout)
except:
data = None
if data and self.eval_packet(data):
return data
retries -= 1
raise Exception('Server response timed out')
def send_rcon_cmd(self, cmd):
r = self.send_cmd('rcon "%s" %s' % (self.rcon_password, cmd))
if r[1] == 'No rconpassword set on the server.\n' or r[1] == \
'Bad rconpassword.\n':
raise Exception(r[1][:-1])
return r
def parse_response(response):
"""
Returns a list of dictionaries like
{ 'ip' : '127.0.0.1', 'guid' : '32char_guid', player : 'pants ' }
"""
regex_guid = '(?P<guid>[0-9A-F]{32})\(\w+\)'
regex_ip = '(?P<ip>(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}' + \
'(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)):\d+'
regex_alias = '"(?P<alias>.*)"'
regex_line = '^\^3PunkBuster Server: \d{1,2} ?%s %s .* %s$' % (
regex_guid, regex_ip, regex_alias)
regex = re.compile(regex_line, re.IGNORECASE | re.MULTILINE)
groups = regex.groupindex
player_matches = regex.findall(response)
players = list()
for player in player_matches:
players.append({
'alias' : player[groups['alias']-1],
'guid' : player[groups['guid']-1],
'ip' : player[groups['ip']-1]
})
return players
def parse_cpma_colours(str):
if not str:
return str
parsed = ''
bc = False
count = 0
for ch in str:
if ch == '^':
bc = True
if count > 1:
count = 0
parsed += ch
count += 1
elif ch == '@' or ch == '!':
if not bc:
parsed += ch
count = 0
else:
if not bc:
parsed += ch
else:
bc = False
count = 0
return parsed
def add_players(db, players):
count = 0
for player in players:
added = db.add_player(parse_cpma_colours(player['alias']),
player['guid'], player['ip'])
if added:
count += 1
return count
def output(db_cursor):
print '[ GUID ]\t\t\t\t[ IP ]\t\t[ player ]'
for row in db_cursor:
print '%s\t%s\t%s' % (row['guid'], row['ip'], row['alias'])
def update():
q = Quake3(SERVER, RCONPASS)
db = Database()
response = q.send_rcon_cmd('pb_sv_plist')
players = parse_response(response)
added = add_players(db, players)
print '%d player(s) captured' % len(players)
print '%d player(s) added to the database' % added
def search(type, str):
db = Database()
if type == 'alias':
output(db.query_alias(str))
elif type == 'guid':
output(db.query_guid(str))
elif type == 'guid8':
output(db.query_guid8(str))
elif type == 'ip':
output(db.query_ip(str))
else:
print 'type doesnt exist'
def daemonize(arg):
print 'daemonizing'
if os.name != 'nt':
pid = os.fork()
if pid > 0:
sys.exit(0)
else:
print 'cant daemonize on windows just gonna loop'
while True:
update()
time.sleep(arg)
def usage():
print sys.argv[0]
print '>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<'
print '-h or --help for this screen'
print '-u or --update for updating the database with new player info'
print '-d or --daemonize [time] for updating every interval'
print '-s or --search [type] [string to search]'
print '\ttypes: guid guid8 alias ip'
if __name__ == '__main__':
try:
opts, args = getopt.getopt(sys.argv[1:], 'huds:', ['help','update',
'daemonize', 'search'])
except getopt.GetoptError:
usage()
sys.exit(2)
for opt, arg in opts:
if opt in ('-h', '--help'):
usage()
sys.exit()
elif opt in ('-u', '--update'):
update()
sys.exit()
elif opt in ('-d', '--daemonize'):
if len(args) == 0:
daemonize(300)
else:
daemonize(int(args[0]))
sys.exit()
elif opt in ('-s', '--search'):
if len(args) > 0 and len(args) < 2:
search(arg, args[0])
sys.exit()
usage()
sys.exit(1)
# >><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><<>><< #
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment