Last active
December 17, 2015 00:09
-
-
Save lahwran/a0224c67d6542d08462b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import subprocess | |
import os | |
import signal | |
import contextlib | |
import json | |
import argparse | |
import time | |
import struct | |
def get_window(): | |
result = subprocess.check_output(["osascript", "-e", r""" | |
global frontApp, frontAppName, windowTitle | |
set windowTitle to "UNKNOWN WINDOW TITLE" | |
set frontApp to "" | |
set frontAppName to "" | |
tell application "System Events" | |
set frontApp to first application process whose frontmost is true | |
set frontAppName to name of frontApp | |
try | |
tell process frontAppName | |
tell (1st window whose value of attribute "AXMain" is true) | |
set windowTitle to value of attribute "AXTitle" | |
end tell | |
end tell | |
end try -- ignore errors | |
end tell | |
return frontAppName & "\n" & windowTitle | |
"""]) | |
splitted = result.split("\n") | |
app = splitted[0] | |
window = splitted[1] | |
return app, window | |
class SignalHandlyThingy(object): | |
def __init__(self): | |
self.locked = False | |
self.triggered = None | |
self.old_signal_handler = signal.getsignal(signal.SIGINT) | |
signal.signal(signal.SIGINT, self.signal_handler) | |
def signal_handler(self, *a, **kw): | |
self.triggered = (a, kw) | |
if not self.locked: | |
self.tick() | |
def tick(self): | |
self.locked = False | |
if self.triggered is not None: | |
a, kw = self.triggered | |
self.triggered = None | |
self.old_signal_handler(*a, **kw) | |
@contextlib.contextmanager | |
def __call__(self): | |
self.locked = True | |
yield | |
self.tick() | |
class IDStandin(object): | |
def __init__(self, filename): | |
self.mapping = {} | |
self.rmapping = [] | |
self.max = 0 | |
self.filename = filename | |
try: | |
reader = open(filename, 'rb') | |
except IOError as e: | |
if e.errno == 2: pass | |
else: raise | |
else: | |
with reader: | |
nocontinue = False | |
for index, line in enumerate(reader): | |
if not line.replace('\n', ''): | |
nocontinue = True | |
continue | |
assert not nocontinue | |
line = json.loads(line) | |
self.mapping[line] = index | |
self.rmapping.append(line) | |
self.writer = open(self.filename, "ab") | |
def collapse(self, item): | |
try: | |
return self.mapping[item] | |
except KeyError: | |
index = len(self.rmapping) | |
self.mapping[item] = index | |
self.rmapping.append(item) | |
self.writer.write(json.dumps(item).replace('\n', '') + "\n") | |
self.writer.flush() | |
os.fsync(self.writer.fileno()) | |
return index | |
def expand(self, id): | |
return self.rmapping[id] | |
class Writeythingy(object): | |
def __init__(self, logdir): | |
try: | |
os.makedirs(logdir) | |
except OSError as e: | |
if e.errno == 17: | |
pass | |
else: | |
raise | |
self.oldlogfile = os.path.join(logdir, "log") | |
self.logfile = os.path.join(logdir, "b_log") | |
self.appfile = os.path.join(logdir, "apps") | |
self.winfile = os.path.join(logdir, "windows") | |
self.apps = IDStandin(self.appfile) | |
self.wins = IDStandin(self.winfile) | |
self.signallock = SignalHandlyThingy() | |
self.last = None | |
self.writer = open(self.logfile, "ab") | |
def write(self, line): | |
assert len(line) == 17 | |
self.writer.write(line + "\n") | |
self.writer.flush() | |
os.fsync(self.writer.fileno()) | |
def tickmsg(self, appname, winname, t=None): | |
if t is None: t = time.time() | |
appid = self.apps.collapse(appname) | |
winid = self.wins.collapse(winname) | |
data = struct.pack(">BLLQ", 0, appid, winid, int(t * 1000)) | |
self.write(data) | |
def quitmsg(self, t=None): | |
if t is None: t = time.time() | |
data = struct.pack(">BLLQ", 1, 0, 0, int(t * 1000)) | |
self.write(data) | |
def tick(self): | |
start = time.time() | |
cur = get_window() | |
if self.last != cur: | |
appname, winname = cur | |
self.tickmsg(appname, winname) | |
end = time.time() | |
remain = float(int(end) + 1) - end | |
if remain > 0: | |
time.sleep(remain) | |
self.last = cur | |
def convert(self): | |
with open(self.oldlogfile, "rb") as reader: | |
for line in reader: | |
if not line.strip(): continue | |
decoded = json.loads(line) | |
time = decoded["time"] | |
if "w" in decoded: | |
appname, winname = decoded["w"] | |
self.tickmsg(appname, winname, t=time) | |
else: | |
self.quitmsg(t=time) | |
def run(self): | |
try: | |
while True: | |
with self.signallock(): | |
self.tick() | |
except: | |
self.quitmsg() | |
raise | |
def loadall(self): | |
items = [] | |
with open(self.logfile, "rb") as reader: | |
while True: | |
line = reader.read(18) | |
assert len(line) in [0, 18] | |
if not line: | |
break | |
kind, appid, winid, t = struct.unpack(">BLLQ", line[:-1]) | |
val = {"time": float(t)/1000} | |
if kind == 0: | |
appname = self.apps.expand(appid) | |
winname = self.wins.expand(winid) | |
val["w"] = [appname, winname] | |
else: | |
val["quit"] = True | |
items.append(val) | |
import pprint | |
pprint.pprint(items) | |
return items | |
def main(logfile, convert, read): | |
thingy = Writeythingy(os.path.dirname(logfile)) | |
if convert: | |
thingy.convert() | |
elif read: | |
return thingy.loadall() | |
else: | |
thingy.run() | |
parser = argparse.ArgumentParser() | |
parser.add_argument('logfile', default="~/.usagelog/log", nargs="?", | |
type=lambda x: os.path.abspath(os.path.expanduser(x))) | |
parser.add_argument('--convert', action="store_true") | |
parser.add_argument('--read', action="store_true") | |
if __name__ == "__main__": | |
args = parser.parse_args() | |
r = main(**vars(args)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment