Skip to content

Instantly share code, notes, and snippets.

@w1ndy
Last active March 29, 2021 23:24
Show Gist options
  • Save w1ndy/ee18a9f73a277642806fe3c0d5d55dcd to your computer and use it in GitHub Desktop.
Save w1ndy/ee18a9f73a277642806fe3c0d5d55dcd to your computer and use it in GitHub Desktop.
Analyzing chat log 'server_chat_log.txt' produced by Don't Starve Together
#!/usr/bin/python3
import sys
import re
import operator
import json
from enum import Enum
class EventType(Enum):
JOIN = 1
LEAVE = 2
SAY = 3
DEATH = 4
RESURRECT = 5
SKIN = 6
ROLL = 7
class Event:
def __init__(self, time, evtype, player):
self.time = time
self.evtype = evtype
self.player = player
class JoinEvent(Event):
def __init__(self, time, player):
super().__init__(time, EventType.JOIN, player)
@staticmethod
def parse_string(line):
result = re.match(r'\[(\d+):(\d+):(\d+)\]: \[Join Announcement\] (.+)$', line)
if result:
print(result.group(0))
hh, mm, ss = int(result.group(1)), int(result.group(2)), int(result.group(3))
return JoinEvent(hh * 3600 + mm * 60 + ss, result.group(4))
class LeaveEvent(Event):
def __init__(self, time, player):
super().__init__(time, EventType.LEAVE, player)
@staticmethod
def parse_string(line):
result = re.match(r'\[(\d+):(\d+):(\d+)\]: \[Leave Announcement\] (.+)$', line)
if result:
print(result.group(0))
hh, mm, ss = int(result.group(1)), int(result.group(2)), int(result.group(3))
return LeaveEvent(hh * 3600 + mm * 60 + ss, result.group(4))
class SayEvent(Event):
def __init__(self, time, player, content):
super().__init__(time, EventType.SAY, player)
self.content = content
@staticmethod
def parse_string(line):
result = re.match(r'\[(\d+):(\d+):(\d+)\]: \[Say\] \(.+\) (.+): (.+)$', line)
if result:
print(result.group(0))
hh, mm, ss = int(result.group(1)), int(result.group(2)), int(result.group(3))
return SayEvent(hh * 3600 + mm * 60 + ss, result.group(4), result.group(5))
class DeathEvent(Event):
def __init__(self, time, player, reason):
super().__init__(time, EventType.DEATH, player)
self.reason = reason
@staticmethod
def parse_string(line):
result = re.match(r'\[(\d+):(\d+):(\d+)\]: \[Death Announcement\] (.+) was killed by (.+)\.', line)
if result:
print(result.group(0))
hh, mm, ss = int(result.group(1)), int(result.group(2)), int(result.group(3))
return DeathEvent(hh * 3600 + mm * 60 + ss, result.group(4), result.group(5))
class ResurrectEvent(Event):
def __init__(self, time, player, by):
super().__init__(time, EventType.RESURRECT, player)
self.by = by
@staticmethod
def parse_string(line):
result = re.match(r'\[(\d+):(\d+):(\d+)\]: \[Resurrect Announcement\] (.+) was resurrected by (.+)\.$', line)
if result:
print(result.group(0))
hh, mm, ss = int(result.group(1)), int(result.group(2)), int(result.group(3))
return ResurrectEvent(hh * 3600 + mm * 60 + ss, result.group(4), result.group(5))
Events = [JoinEvent, LeaveEvent, SayEvent, DeathEvent, ResurrectEvent]
def parse_input(line):
for evt in Events:
e = evt.parse_string(line)
if e: return e
return None
def analyze_join_leave(events):
player_duration = {}
online_players = {}
for e in events:
if not e: continue
if e.evtype == EventType.JOIN:
if not e.player in player_duration:
player_duration[e.player] = 0
online_players[e.player] = e.time
elif e.evtype == EventType.LEAVE:
if not e.player in player_duration:
print('Warn: incomplete log, isolated leave event detected.')
player_duration[e.player] = e.time
else:
player_duration[e.player] += e.time - online_players[e.player]
del online_players[e.player]
formatted_duration = {d[0]: [d[1], "%02d:%02d:%02d" % (d[1] / 3600, d[1] / 60 % 60, d[1] % 60)] for d in player_duration.items()}
return {
'players': list(player_duration.keys()),
'online_players': list(online_players.keys()),
'longest_duration': max(formatted_duration, key=operator.itemgetter(1)),
'duration': formatted_duration
}
def analyze_say(events):
player_speak_freq = {}
for e in events:
if not e: continue
if e.evtype == EventType.SAY:
if not e.player in player_speak_freq:
player_speak_freq[e.player] = 0
player_speak_freq[e.player] += 1
return {
'frequency': player_speak_freq,
'chatterbox': max(player_speak_freq.items(), key=operator.itemgetter(1))[0]
}
def analyze_death_resurrect(events):
player_death_freq = {}
player_resurrect_freq = {}
dead_players = {}
players = set()
for e in events:
if not e: continue
if e.evtype == EventType.DEATH:
dead_players[e.player] = e.time
if not e.player in player_death_freq:
players.add(e.player)
player_death_freq[e.player] = [1, { e.reason: 1 }, {}]
else:
player_death_freq[e.player][0] += 1
if not e.reason in player_death_freq[e.player][1]:
player_death_freq[e.player][1][e.reason] = 0
player_death_freq[e.player][1][e.reason] += 1
if e.evtype == EventType.RESURRECT:
del dead_players[e.player]
if not e.by in player_resurrect_freq:
players.add(e.by)
player_resurrect_freq[e.by] = [1, { e.player: 1 }]
else:
player_resurrect_freq[e.by][0] += 1
if not e.player in player_resurrect_freq[e.by][1]:
player_resurrect_freq[e.by][1][e.player] = 0
player_resurrect_freq[e.by][1][e.player] += 1
if e.player in player_death_freq:
if not e.by in player_death_freq[e.player][2]:
player_death_freq[e.player][2][e.by] = 0
player_death_freq[e.player][2][e.by] += 1
def get_player_death_count(p):
return player_death_freq[p][0] if p in player_death_freq else 0
def get_player_death_reasons(p):
return player_death_freq[p][1] if p in player_death_freq else {}
def get_player_resurrected_by(p):
return player_death_freq[p][2] if p in player_death_freq else {}
def get_player_resurrect_count(p):
return player_resurrect_freq[p][0] if p in player_resurrect_freq else 0
def get_player_resurrect_stat(p):
return player_resurrect_freq[p][1] if p in player_resurrect_freq else {}
formatted_per_player = {
p: {
'death': get_player_death_count(p),
'death_reasons': get_player_death_reasons(p),
'death_resurrected_by': get_player_resurrected_by(p),
'resurrect': get_player_resurrect_count(p),
'resurrect_stat': get_player_resurrect_stat(p),
'most_resurrected_by': list(max(get_player_resurrected_by(p).items(), key=operator.itemgetter(1))),
'most_resurrect': list(max(get_player_resurrect_stat(p).items(), key=operator.itemgetter(1)))
} for p in players}
most_dead = max(player_death_freq.items(), key=lambda x: x[1][0])
most_helpful = max(player_resurrect_freq.items(), key=lambda x: x[1][0])
most_bonded = max([(p[0], max(p[1][1].items(), key=lambda x: x[1])) for p in player_resurrect_freq.items()], key=lambda t: t[1][1])
return {
'per_player': formatted_per_player,
'dead_players': list(dead_players.keys()),
'most_dead': [most_dead[0], most_dead[1][0]],
'most_helpful': [most_helpful[0], most_helpful[1][0]],
'most_bonded': [most_bonded[0], most_bonded[1][0], most_bonded[1][1]]
}
def main():
if len(sys.argv) != 3:
print('Usage: %s LOG_FILE_NAME OUTPUT_FILE_NAME' % sys.argv[0])
return
fin = open(sys.argv[1], encoding='utf-8')
if not fin:
print('Cannot open %s' % sys.argv[1])
events = list(map(lambda l: parse_input(l), fin))
output = {
'JOIN_LEAVE_STAT': analyze_join_leave(events),
'SAY_STAT': analyze_say(events),
'DEATH_RESURRECT_STAT': analyze_death_resurrect(events)
}
with open(sys.argv[2], 'w') as fout:
fout.write(json.dumps(output, indent=4, ensure_ascii=False))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment