Created
January 25, 2024 11:44
-
-
Save arabcoders/6c9deca99f456c521bda3a74cd654ee0 to your computer and use it in GitHub Desktop.
A python script to save your bash history to sqlite db and allow you to export it back.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| # -*- encoding: UTF8 -*- | |
| import logging | |
| import sqlite3 | |
| import os | |
| import sys | |
| class Bistorian: | |
| """ | |
| Import/export your bash history. | |
| """ | |
| bash_history: str = None | |
| db_file: str = None | |
| db_connection: sqlite3.Connection = None | |
| logger: logging.Logger = None | |
| def __init__(self, logger: logging.Logger, db_file: str = None, bash_history: str = None, mode: str = 'import', search: str = None) -> None: | |
| self.logger = logger | |
| self.db_file = os.path.expanduser(db_file) | |
| self.bash_history = os.path.expanduser(bash_history) | |
| self.mode = mode if mode else 'import' | |
| initialize: bool = False | |
| if not os.path.exists(self.db_file): | |
| os.makedirs(os.path.dirname(self.db_file), exist_ok=True) | |
| initialize = True | |
| self.db_connection = sqlite3.connect(self.db_file) | |
| self.db_connection.row_factory = sqlite3.Row | |
| if initialize: | |
| self.logger.info(f"Initializing history db [{self.db_file}].") | |
| for command in self.db_commands(): | |
| self.logger.debug(f"Executing [{command}].") | |
| self.db_connection.execute(command) | |
| if search: | |
| self.mode = 'search' | |
| if self.mode == 'import': | |
| self.import_history() | |
| elif self.mode == 'export': | |
| self.export_history() | |
| elif self.mode == 'search': | |
| self.search_history(command=search) | |
| else: | |
| print(f"Unknown mode: [{self.mode}].") | |
| sys.exit(1) | |
| def import_history(self): | |
| if not os.path.exists(self.bash_history): | |
| print(f"bash history file does not exist: [{self.bash_history}].") | |
| sys.exit(1) | |
| time = 0 | |
| command = '' | |
| entries_new = 0 | |
| entries_updated = 0 | |
| cursor = self.db_connection.cursor() | |
| with open(self.bash_history, 'r') as f: | |
| line_number = 0 | |
| for command in f.readlines(): | |
| line_number += 1 | |
| command = command.strip() | |
| if len(command) > 0 and command.startswith('#'): | |
| try: | |
| time = int(command[1:]) | |
| except: | |
| time = 0 | |
| continue | |
| entry = self.has_entry(cursor, command) | |
| if not command or command == '': | |
| self.logger.warning( | |
| f"Empty command at line {line_number}. skipping.") | |
| continue | |
| if entry and entry['command_ts'] <= time: | |
| continue | |
| self.save(cursor, time, command) | |
| if not entry: | |
| entries_new += 1 | |
| self.logger.debug(f"Adding ({time=}, {command=}).") | |
| else: | |
| entries_updated += 1 | |
| self.logger.debug(f"Updating ({time=}, {command=})") | |
| time = 0 | |
| if entries_new > 0 or entries_updated > 0: | |
| self.db_connection.commit() | |
| self.logger.info( | |
| f"New entries: {entries_new}, updated entries: {entries_updated}.") | |
| def has_entry(self, cursor: sqlite3.Cursor, command: str = None) -> bool: | |
| if not command: | |
| return None | |
| cursor.execute( | |
| 'SELECT * from bash_history WHERE command = ?', (command,)) | |
| fetch = cursor.fetchone() | |
| return fetch if fetch else None | |
| def save(self, cursor: sqlite3.Cursor, ts: int, command: str) -> None: | |
| cursor.execute( | |
| 'INSERT INTO bash_history (command, command_ts) VALUES (?, ?) ON CONFLICT(command) DO UPDATE SET command_ts=?, times=times+1', | |
| (command, ts, ts,) | |
| ) | |
| def export_history(self): | |
| cursor = self.db_connection.cursor() | |
| cursor.execute( | |
| 'SELECT "command", "command_ts" FROM bash_history ORDER BY "command_ts" ASC') | |
| for row in cursor: | |
| check_ts = int(row['command_ts']) | |
| if check_ts > 0: | |
| print(f"#{check_ts}") | |
| print(row['command']) | |
| def search_history(self, command: str): | |
| cursor = self.db_connection.cursor() | |
| cursor.execute( | |
| "SELECT * from bash_history WHERE command LIKE '%' || ? || '%' ORDER BY command_ts ASC", (command,)) | |
| has_results = False | |
| for row in cursor: | |
| has_results = True | |
| print(f"{row['command']}") | |
| if not has_results: | |
| print(f"No results for [{command}].") | |
| sys.exit(1) | |
| def db_commands(self) -> list: | |
| return [ | |
| 'CREATE TABLE "bash_history" ("id" integer PRIMARY KEY AUTOINCREMENT, "command" text NOT NULL, "command_ts" timestamp, "times" integer NOT NULL DEFAULT 1)', | |
| 'CREATE UNIQUE INDEX "bash_history_command" ON "bash_history" ("command")', | |
| ] | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser( | |
| description='Import/export your bash history.', | |
| epilog='You should add [HISTTIMEFORMAT="%F %T "] to your bash config to allow for timestamps to be saved.', | |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter | |
| ) | |
| parser.add_argument('-d', '--db', type=str, | |
| help='Database file', | |
| default='~/.config/shell/bash/history.sqlite') | |
| parser.add_argument('-b', '--bash', type=str, | |
| help='Bash history file', | |
| default='~/.bash_history') | |
| parser.add_argument('-m', '--mode', type=str, | |
| help='Mode (import or export)', | |
| default='import', choices=['import', 'export'],) | |
| parser.add_argument("-l", "--log", type=str, choices=sorted(['DEBUG', 'INFO', 'WARNING', 'ERROR']), | |
| help=f"logging level.", default='INFO') | |
| parser.add_argument("search", type=str, nargs='?', | |
| help=f"Search history", default=None) | |
| args = parser.parse_args() | |
| numeric_level = getattr(logging, args.log.upper(), None) | |
| logging.basicConfig( | |
| level=numeric_level, format="%(asctime)s [%(levelname)-5.5s] %(message)s") | |
| log = logging.getLogger('Historian') | |
| Bistorian( | |
| db_file=args.db, bash_history=args.bash, | |
| mode=args.mode, | |
| logger=log, | |
| search=args.search, | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment