Last active
January 5, 2025 12:20
-
-
Save timraay/1a898eb6bc56469680bc572d48008907 to your computer and use it in GitHub Desktop.
Battlemetrics log extraction script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Battlemetrics log extraction script | |
Last updated October 9th 2022 | |
The intention of this script is to allow pulling large number of log lines | |
from the Battlemetrics API and convert them to a text format that looks like | |
that of the Community RCON Tool. For the sake of simplicity and since | |
Battlemetrics doesn't store the same amount of information this may mean that | |
some events are ignored (eg. votekicks, bans) or have placeholder values | |
(Steam64ID in chat events). | |
Running this requires Python 3. Dependencies are automatically installed upon | |
starting the script. | |
""" | |
# Set to True to enable debug information | |
DEBUG = False | |
import subprocess | |
import sys | |
def log(message, *args): | |
if DEBUG: | |
print('[DEBUG]', message % args) | |
def install(package): | |
print(f"Installing '{package}' module...") | |
subprocess.check_call([sys.executable, "-m", "pip", "install", package]) | |
from datetime import datetime | |
try: | |
import requests | |
except ImportError: | |
install('requests') | |
import requests | |
try: | |
import pyperclip | |
except ImportError: | |
install('pyperclip') | |
import pyperclip | |
try: | |
from dateutil.parser import parse | |
except ImportError: | |
install('python-dateutil') | |
from dateutil.parser import parse | |
init = False | |
while not init: | |
try: | |
SERVER_ID = int(input('\nInsert the ID of your server. This is the number you can find in the URL of your server\'s Battlemetrics page.\n\n Server ID: ')) | |
ACCESS_TOKEN = input('\nInsert a temporary access token for the Battlemetrics API.\n\n Access token: ') | |
LOGS_START = parse(input('\nInsert the start time of the logs you want to capture in UTC. The parser will accept pretty lenient answers.\n\n Start time (UTC): ')) | |
print(' -->', LOGS_START.isoformat()) | |
LOGS_END = parse(input('\nInsert the end time of the logs you want to capture in UTC.\n\n End time (UTC): ')) | |
print(' -->', LOGS_END.isoformat()) | |
init = True | |
except: | |
print(' Invalid value! Please try again.') | |
else: | |
if LOGS_START > LOGS_END: | |
print(' Start time is later than the end time! Please try again.') | |
init = False | |
URL = "https://api.battlemetrics.com/activity" | |
PAGE_SIZE = 500 | |
PARAMS = { | |
"version": "^0.1.0", | |
"tagTypeMode": "and", | |
"filter[types][blacklist]": "event:query", | |
"filter[servers]": SERVER_ID, | |
"filter[timestamp]": ':' + LOGS_END.strftime('%Y-%m-%dT%H:%M:%S.000Z'), | |
"page[size]": PAGE_SIZE, | |
"access_token": ACCESS_TOKEN | |
} | |
MESSAGES = { | |
"playerMessage": "CHAT \t{playerName}: {message} (00000000000000000)", | |
"event:addPlayer": "CONNECTED \t{name}", | |
"event:removePlayer": "DISCONNECTED \t{name}", | |
"hll:kill": "KILL \t{killerName}({killerTeam}/{killerSteamID}) -> {victimName}({victimTeam}/{victimSteamID}) with {weapon}", | |
"hll:teamKill": "TEAM KILL \t{killerName}({killerTeam}/{killerSteamID}) -> {victimName}({victimTeam}/{victimSteamID}) with {weapon}", | |
"hll:enteredAdminCam": "CAMERA \t[{playerName} ({steamID})] Entered Admin Camera", | |
"hll:leftAdminCam": "CAMERA \t[{playerName} ({steamID})] Left Admin Camera", | |
"hll:matchEnded": "MATCH ENDED \tMATCH ENDED `{map}` {teamOne} ({teamOneScore} - {teamTwoScore}) {teamTwo}", | |
"hll:matchStart": "MATCH START \tMATCH START {map}", | |
"hll:teamSwitch": "TEAMSWITCH \tTEAMSWITCH {playerName} ({fromTeam} > {toTeam})", | |
} | |
def get_datetime(time_str): | |
return datetime.strptime(time_str.split('.')[0], '%Y-%m-%dT%H:%M:%S') | |
def pull_logs(): | |
def _pull(url, params = dict(access_token=ACCESS_TOKEN)): | |
res = requests.get(url, params=params) | |
log('Got status %s from %s', res.status_code, res.url) | |
data = res.json() | |
if data.get('errors'): | |
err = data['errors'][0] | |
raise Exception(f"{err}") | |
# log('Response is %s', data) | |
logs = data['data'] | |
next = data['links'].get('next') | |
filtered_logs = list() | |
for log_data in logs: | |
time = get_datetime(log_data["attributes"]['timestamp']) | |
if time < LOGS_START: | |
break | |
filtered_logs.append(log_data) | |
print('... Collected', len(filtered_logs), 'logs') | |
log('Num logs gathered: %s / %s - Request more? %s', len(filtered_logs), PAGE_SIZE, len(filtered_logs) == PAGE_SIZE) | |
return filtered_logs, next | |
all_logs = [] | |
logs, url = _pull(URL, params=PARAMS) | |
all_logs += logs | |
while len(logs) == PAGE_SIZE: | |
logs, url = _pull(url) | |
all_logs += logs | |
log('Returning a total of %s logs', len(logs)) | |
return all_logs | |
UNKNOWN_EVENTS = set() | |
def process_log(log): | |
log = log["attributes"] | |
type = log["messageType"] | |
timestamp = log["timestamp"] | |
data = log["data"] | |
time = get_datetime(timestamp) | |
message = MESSAGES.get(type) | |
if message: | |
return str(time) + '\t' + message.format(**data) | |
else: | |
UNKNOWN_EVENTS.add(type) | |
try: | |
print() | |
logs = list() | |
data = pull_logs() | |
for log_ in data: | |
text = process_log(log_) | |
if text: | |
logs.append(text) | |
if UNKNOWN_EVENTS: | |
log('Skipped %s events that were not recognized: %s', len(UNKNOWN_EVENTS), ', '.join(UNKNOWN_EVENTS)) | |
pyperclip.copy('\n'.join(logs)) | |
print('\nCopied', len(logs), 'logs to clipboard!') | |
except Exception as e: | |
print('\n####################################') | |
print('\n An error occured!') | |
print(f' {e.__class__.__name__}: {str(e)}\n') | |
if DEBUG: | |
import traceback | |
print(traceback.format_exc()) | |
input('Press Enter to close.') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment