Skip to content

Instantly share code, notes, and snippets.

@kai4785
Created October 1, 2018 14:38
Show Gist options
  • Save kai4785/62377f827120440c807c7a9ccf09bb36 to your computer and use it in GitHub Desktop.
Save kai4785/62377f827120440c807c7a9ccf09bb36 to your computer and use it in GitHub Desktop.
Takp DPS parser
#!/usr/bin/python3.6
'''
takp_dps.py
This script will parse and follow (see Python-Tail below) a TAKP client
generated log file, and generate Battle Reports.
Battle reports are printed to the screen 10 seconds (keep_alive) after battle
messages have stopped. All timestamps come from the log file, so if it's been
10 seconds, but you haven't seen an update, say something to your group.
Examples:
Change parse Meriadoc's file, change 'You' to Meriadoc in the log entries, and follow the file
takp_dps.py -l ~/.takp/drive_e/TAKP/eqlog_Meriadoc_loginse.txt --me Meriadoc -f
Show everything for Meriadoc that's happened today.
takp_dps.py -l ~/.takp/drive_e/TAKP/eqlog_Meriadoc_loginse.txt --history --me Meriadoc -s 00:00:00
History and follow can be used together:
takp_dps.py -l ~/.takp/drive_e/TAKP/eqlog_Meriadoc_loginse.txt --history --me Meriadoc -s 00:00:00 -f
'''
'''
Python-Tail - Unix tail follow implementation in Python.
python-tail can be used to monitor changes to a file.
Example:
import tail
# Create a tail instance
t = tail.Tail('file-to-be-followed')
# Register a callback function to be called when a new line is found in the followed file.
# If no callback function is registerd, new lines would be printed to standard out.
t.register_callback(callback_function)
# Follow the file with 5 seconds as sleep time between iterations.
# If sleep time is not provided 1 second is used as the default time.
t.follow(s=5) '''
# Author - Kasun Herath <kasunh01 at gmail.com>
# Source - https://github.com/kasun/python-tail
import os
import sys
import time
import re
from datetime import datetime, timedelta, date
from argparse import ArgumentParser as Argp
from contextlib import suppress
class Tail(object):
''' Represents a tail command. '''
def __init__(self, tailed_file):
''' Initiate a Tail instance.
Check for file validity, assigns callback function to standard out.
Arguments:
tailed_file - File to be followed. '''
self.check_file_validity(tailed_file)
self.tailed_file = tailed_file
self.callback = sys.stdout.write
def follow(self, s=1):
''' Do a tail follow. If a callback function is registered it is called with every new line.
Else printed to standard out.
Arguments:
s - Number of seconds to wait between each iteration; Defaults to 1. '''
with open(self.tailed_file) as file_:
# Go to the end of file
file_.seek(0,2)
while True:
curr_position = file_.tell()
line = file_.readline()
if not line:
file_.seek(curr_position)
time.sleep(s)
else:
self.callback(line)
def register_callback(self, func):
''' Overrides default callback function to provided function. '''
self.callback = func
def check_file_validity(self, file_):
''' Check whether the a given file exists, readable and is a file '''
if not os.access(file_, os.F_OK):
raise TailError("File '%s' does not exist" % (file_))
if not os.access(file_, os.R_OK):
raise TailError("File '%s' not readable" % (file_))
if os.path.isdir(file_):
raise TailError("File '%s' is a directory" % (file_))
class TailError(Exception):
def __init__(self, msg):
self.message = msg
def __str__(self):
return self.message
class Fight(object):
def __init__(self, start):
self.start = start
self.end = self.start
self._damage = 0
self.hits = 0
def melee(self, time, verb, damage):
self.end = time
# NOTE: Killing blows are purposefully over-kill, and they don't help show you your dps. We should track them separately.
if damage < 32000:
self._damage += damage
self.hits += 1
@property
def seconds(self):
seconds = (self.end - self.start).total_seconds()
if not seconds:
seconds = 1
return int(seconds)
@property
def damage(self):
return int(self._damage)
@property
def dps(self):
return self._damage / self.seconds
@property
def dpa(self):
if self.hits == 0:
return 0
else:
return self._damage / self.hits
@property
def hps(self):
return self.hits / self.seconds
def __str__(self):
return f'{self.seconds}s, {self.hits}({self.aps}/s) hits, {self._damage}({self.dps}/s, {self.dpa}/a) damage'
class Battle(object):
def __init__(self, time):
self._melee = dict()
self._deaths = dict()
self.start = time
self.keep_alive(self.start)
def keep_alive(self, time, bump = timedelta(seconds=10)):
self.end = time
self.expire = self.end + bump
def melee(self, time, attacker, target, verb, damage):
self.keep_alive(time)
if attacker not in self._melee:
self._melee[attacker] = dict()
if target not in self._melee[attacker]:
self._melee[attacker][target] = Fight(time)
self._melee[attacker][target].melee(time, verb, damage)
def magic(self, time, target, verb, damage):
self.melee(time, f'Spell/DS({damage})', target, verb, damage)
@property
def seconds(self):
seconds = (self.end - self.start).total_seconds()
if not seconds:
seconds = 1
return int(seconds)
def death(self, target, slayer = ''):
if target not in self._deaths:
self._deaths[target] = 0
self._deaths[target] += 1
def report(self, time):
print(f'Battle report {self.seconds}s [{self.start} - {self.end}]:')
print('Melee:')
header_format = '{:<25s} {:<25s} {:>4s} {:>4s} {:>5s} {:>6s} {:>6s} {:>6s}'
fight_format = '{:<25s} {:<25s} {:>4d} {:>4d} {:>5.2f} {:>6d} {:>6.2f} {:>6.2f}'
break_str = '----------------------------------------------------------------------------------------'
print(header_format.format('(N)PC', 'Target', 'Sec', 'Hits', 'h/s', 'Damage', 'd/a', 'd/s'))
print(break_str)
for _attacker, targets in self._melee.items():
attacker = _attacker
total_hits = 0
total_damage = 0
total_seconds = 0
total_fights = 0
for target, fight in targets.items():
total_hits += fight.hits
total_damage += fight.damage
total_seconds += fight.seconds
total_fights += 1
print(fight_format.format(attacker, target, fight.seconds, fight.hits, fight.hps, fight.damage, fight.dpa, fight.dps))
attacker = ''
if total_fights > 1:
total_hps = total_hits / total_seconds
total_dpa = total_damage / total_hits
total_dps = total_damage / total_seconds
print(fight_format.format(attacker, 'Total', total_seconds, total_hits, total_hps, total_damage, total_dpa, total_dps))
print(break_str)
print()
print('Deaths:')
print('{:<25s} {:<6s}'.format('Target', 'Deaths'))
print(break_str)
for target, times in self._deaths.items():
print('{:<25s} {:<6d}'.format(target, times))
print(break_str)
print()
class Process(object):
def __init__(self, pc_list, you, since):
self.battle = None
self.attacker = ''
self.target = ''
self.pc_list = pc_list
self.you = you
self.since = since
'''
self.pc_list = [
'You',
'Mars',
'Rhanxette',
'Boomba',
'Chelsea',
'Gatito',
'Hercules',
'Illeatyo',
'Jaxxys',
'Jibarn',
'Jonartik',
'Khaurgar',
'Khaurgar`s warder',
'Lonarab',
'Mael',
'Mael`s warder',
'Mars',
'Notes',
'Rhanxette',
'Shackles',
'Tripsy',
'Vobn',
'Xabober',
'You',
'Yudo',
'Dain Frostreaver IV',
]
'''
self.pc_regexp = '|'.join(self.pc_list)
self.melee_verbs = "hit|slash|crush|pierce|kick|bash|maul|gore|gores|slashes|crushes|hits|kicks|bashes|bites|pierces|mauls|backstab|backstabs"
self.melee_reg = re.compile(fr'^({self.pc_regexp}) ({self.melee_verbs}) (.*) for ([0-9]+) points of damage\.')
self.magic_re = re.compile(fr'(.*) was (hit by non-melee) for ([0-9]+) points of damage\.')
self.death_re1 = re.compile(fr'({self.pc_regexp}) have slain ({self.pc_regexp})!')
self.death_re2 = re.compile(fr'({self.pc_regexp}) (:?has|have) been slain by ({self.pc_regexp})!')
self.death_re3 = re.compile(fr'({self.pc_regexp}) died\.')
def __call__(self, line):
'''
Melee messages are:
'attacker' 'verb' 'target' for 'xx' points of damage.
Damage Shield and Proc messages are:
'target' was hit by non-melee for 'xx' points of damage.
'''
# Skip blank or mal-formed lines
try:
time = datetime.strptime(line[1:25], '%a %b %d %H:%M:%S %Y')
except:
return
if time < since:
return
msg = line[27:-1]
magic = self.magic_re.search(msg)
if magic:
if not self.battle:
self.battle = Battle(time)
target, verb, damage = magic.group(1, 2, 3)
self.battle.magic(time, target, verb, int(damage))
return
melee = self.melee_reg.search(msg)
if melee:
if not self.battle:
self.battle = Battle(time)
attacker, verb, target, damage = melee.group(1, 2, 3, 4)
if attacker == 'You':
attacker = self.you
if target == 'YOU':
target = self.you
self.battle.melee(time, attacker, target, verb, int(damage))
return
death = self.death_re1.search(msg)
if death:
if not self.battle:
self.battle = Battle(time)
slayer, target = death.group(1, 2)
if slayer == 'You':
slayer = self.you
if target == 'You':
target = self.you
self.battle.death(target, slayer)
return
death = self.death_re2.search(msg)
if death:
if not self.battle:
self.battle = Battle(time)
target, slayer = death.group(1, 2)
if slayer == 'You':
slayer = self.you
if target == 'You':
target = self.you
self.battle.death(target, slayer)
return
death = self.death_re3.search(msg)
if death:
if not self.battle:
self.battle = Battle(time)
target = death.group(1)
if target == 'You':
target = self.you
self.battle.death(target)
return
# End the battle
if self.battle:
end_the_battle = False
if msg == 'LOADING, PLEASE WAIT...':
end_the_battle = True
if time > self.battle.expire:
end_the_battle = True
if end_the_battle:
self.battle.report(time)
self.battle = None
argp = Argp(description='Process TAKP logs')
argp.add_argument('--me', '-m', help='Who is "You" in the logs', default='Me')
argp.add_argument('--pc', '-p', action='append', help='Filter to specific (Non-)Player Characters to search for in the logs', default=['[A-Za-z `]+'])
argp.add_argument('--log', '-l', help='Logfile to watch')
argp.add_argument('--history', help='Read the whole log history', action='store_true')
argp.add_argument('--follow', '-f', help='Follow the log file', action='store_true')
argp.add_argument('--since', '-s', help='Parse logs since', default='Thu Jan 01 00:00:00 1970')
args = argp.parse_args()
since = None
today = date.today()
with suppress(ValueError):
since = datetime.strptime(args.since, '%a %b %d %H:%M:%S %Y')
with suppress(ValueError):
since = datetime.strptime(args.since, '%b %d %H:%M:%S %Y')
with suppress(ValueError):
since = datetime.strptime(args.since, '%b %d %H:%M:%S')
since = since.replace(year=today.year)
with suppress(ValueError):
since = datetime.strptime(args.since, '%b %d')
since = since.replace(year=today.year)
with suppress(ValueError):
since = datetime.strptime(args.since, '%H:%M:%S')
since = since.replace(year=today.year, month=today.month, day=today.day)
if not since:
raise Exception(f'Unable to parse date string [{args.since}]')
process = Process(pc_list = args.pc, you = args.me, since=args.since)
if args.history:
with open(args.log, "r") as fd:
for line in fd.readlines():
process(line)
if args.follow:
t = Tail(args.log)
t.register_callback(process)
t.follow()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment