Created
February 15, 2022 05:49
-
-
Save shunghsiyu/415058ce74a17294a78808b08d2d4abb to your computer and use it in GitHub Desktop.
Python script for review usage history
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# | |
# Output both history of bash commands and Firefox visits in reverse | |
# chronological order. | |
import configparser | |
import logging | |
import os | |
import re | |
import shutil | |
import sqlite3 | |
import sys | |
import tempfile | |
from datetime import datetime, timedelta | |
from functools import partial | |
from heapq import heapify, heappop, heapreplace | |
class Color: | |
BOLD = '\033[1m' | |
END = '\033[1;37;0m' | |
def merge(*iterables): | |
"""Merge sorted iterables. | |
Modified from https://codereview.stackexchange.com/a/187368 | |
""" | |
entries = [] # Heap of [front value, id, iterator]. | |
for id, it in enumerate(map(iter, iterables)): | |
try: | |
entries.append([next(it), id, it]) | |
except StopIteration: | |
pass | |
heapify(entries) | |
while entries: | |
value, _, it = entry = entries[0] | |
yield value | |
try: | |
entry[0] = next(it) | |
heapreplace(entries, entry) | |
except StopIteration: | |
heappop(entries) | |
class Firefox: | |
PROFILE_DIR = os.path.join( | |
os.environ['HOME'], | |
".mozilla", | |
"firefox", | |
) | |
PROFILE_CONFIG = os.path.join( | |
PROFILE_DIR, | |
"profiles.ini", | |
) | |
QUERY = """ | |
SELECT title, url, visit_date/1000000 as time | |
FROM moz_historyvisits | |
LEFT JOIN moz_places ON moz_historyvisits.place_id = moz_places.id | |
WHERE | |
url NOT LIKE "%//www.google.com/url?%" | |
ORDER BY visit_date ASC; | |
""" | |
def __init__(self, profile_path=None): | |
if not profile_path: | |
profile_path = self.__class__.default_profile() | |
self.src_db_path = os.path.join(profile_path, 'places.sqlite') | |
def __enter__(self): | |
tmp_fd, tmp_path = tempfile.mkstemp() | |
os.close(tmp_fd) | |
shutil.copy(self.src_db_path, tmp_path) | |
self.db_path = tmp_path | |
self._conn = sqlite3.connect(self.db_path) | |
self._conn.set_trace_callback(logging.debug) | |
return iter(self) | |
def __exit__(self, exc_type, exc_value, exc_traceback): | |
self._conn.close() | |
os.unlink(self.db_path) | |
def __iter__(self): | |
cur = self._conn.cursor() | |
for title, url, visit_timestamp in cur.execute(self.QUERY): | |
visit_time = datetime.fromtimestamp(visit_timestamp) | |
logging.debug('%s: title=%s url=%s @%s', self.src_db_path, repr(title), repr(url), visit_time) | |
output = '' | |
if title: | |
if title[0] == '💤': | |
title = title[2:] | |
output += Color.BOLD + title + Color.END + '\n' | |
output += visit_time.strftime('%m/%d(%a) %H:%M') + ' <' + url + '>' | |
output += '\n' | |
yield visit_time, output | |
@classmethod | |
def default_profile(cls): | |
config = configparser.ConfigParser() | |
config.read(cls.PROFILE_CONFIG) | |
for profile in config.values(): | |
name = profile.get('name', None) | |
default = profile.get('default', 0) | |
if not name or not default: | |
continue | |
path = profile['path'] | |
if profile['isrelative']: | |
path = os.path.join(cls.PROFILE_DIR, path) | |
return path | |
else: | |
raise RuntimeError('Cannot find default Firefox Profile!') | |
class Bash: | |
DEFAULT_PATH = os.path.join( | |
os.environ['HOME'], | |
".bash_history", | |
) | |
TIMESTAMP_REGEX = re.compile(r'^#(\d+)') | |
IGNORES = set(['fg', 'bg', 'history', 'ls']) | |
def __init__(self, history_path=DEFAULT_PATH): | |
self.path = history_path | |
def __enter__(self): | |
self._history_file = open(self.path) | |
return iter(self) | |
def __exit__(self, exc_type, exc_value, exc_traceback): | |
self._history_file.close() | |
def __iter__(self): | |
timestamp = None | |
command = None | |
last_command = None | |
for line in self._history_file: | |
m = self.TIMESTAMP_REGEX.match(line) | |
if m: | |
timestamp = datetime.fromtimestamp(int(m.group(1))) | |
logging.debug('%s: got timestamp %s from %s', self.path, timestamp, repr(line)) | |
command = next(self._history_file) | |
else: | |
command = line | |
command = command.strip() | |
logging.debug('%s: cmd=%s @%s', self.path, repr(command), timestamp) | |
if not timestamp or command == last_command or command in self.IGNORES: | |
continue | |
output = Color.BOLD + '$ ' + command + Color.END + '\n' | |
output += timestamp.strftime('%m/%d(%a) %H:%M') + ' <bash>\n' | |
yield timestamp, output | |
last_command = command | |
def parse_date(date_str): | |
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) | |
if date_str == 'now': | |
timestamp = today | |
return timestamp | |
elif date_str.endswith(' days'): | |
diff = int(date_str.replace(' days', '')) | |
timestamp = today + timedelta(days=diff) | |
else: | |
timestamp = datetime.fromisoformat(date_str) | |
return timestamp | |
def main(start, end): | |
def timestamp_range(entry): | |
timestamp = entry[0] | |
return start <= timestamp < end | |
with Firefox() as firefox_history, Bash() as bash_history: | |
for _, output in filter(timestamp_range, merge(firefox_history, bash_history)): | |
try: | |
print(output) | |
except BrokenPipeError: | |
exit | |
return 0 | |
if __name__ == '__main__': | |
end_str = 'now' | |
if len(sys.argv) == 1: | |
diff_str = '-7 days' | |
elif len(sys.argv) == 2: | |
diff_str = sys.argv[1] | |
elif len(sys.argv) == 3: | |
diff_str = sys.argv[1] | |
end_str = sys.argv[2] | |
else: | |
logging.error('Invalid arguments %s', sys.argv) | |
sys.exit(1) | |
debug_mode = int(os.environ.get('DEBUG', '0')) != 0 | |
log_level = logging.DEBUG if debug_mode else logging.WARN | |
logging.basicConfig(level=log_level) | |
sys.exit(main(parse_date(diff_str), parse_date(end_str))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment