|
#!/usr/bin/env python3 |
|
""" |
|
Neovim Undo File Viewer |
|
Reads a Neovim undo file and displays the content at each undo stage. |
|
""" |
|
|
|
import struct |
|
import sys |
|
import argparse |
|
from typing import List, Optional |
|
from dataclasses import dataclass |
|
|
|
# Magic constants from Neovim source |
|
UF_START_MAGIC = b"Vim\237UnDo\345" |
|
UF_START_MAGIC_LEN = 9 |
|
UF_VERSION = 3 |
|
UF_HEADER_MAGIC = 0x5fd0 |
|
UF_HEADER_END_MAGIC = 0x5454 |
|
UF_ENTRY_MAGIC = 0xf518 |
|
UF_ENTRY_END_MAGIC = 0x3581 |
|
UNDO_HASH_SIZE = 32 |
|
|
|
# Optional field types |
|
UF_LAST_SAVE_NR = 1 |
|
UHP_SAVE_NR = 1 # Same value as UF_LAST_SAVE_NR |
|
|
|
@dataclass |
|
class UndoEntry: |
|
"""Represents a single undo entry (change to the buffer)""" |
|
top: int # line number above undo block (1-based, 0 = start) |
|
bot: int # line number below undo block (1-based, 0 = end) |
|
lcount: int # line count when u_save was called |
|
size: int # number of lines in ue_array |
|
lines: List[str] # the saved lines |
|
|
|
@dataclass |
|
class UndoHeader: |
|
"""Represents an undo header (one undo/redo operation)""" |
|
seq: int |
|
entries: List[UndoEntry] |
|
|
|
class UndoFileReader: |
|
def __init__(self, filepath: str): |
|
self.filepath = filepath |
|
self.fp = None |
|
self.headers: List[UndoHeader] = [] |
|
self.line_count = 0 |
|
self.debug = False |
|
|
|
def read_bytes(self, n: int) -> bytes: |
|
"""Read n bytes from file""" |
|
data = self.fp.read(n) |
|
if len(data) != n: |
|
raise EOFError(f"Expected {n} bytes, got {len(data)}") |
|
return data |
|
|
|
def read_byte(self) -> int: |
|
"""Read a single byte""" |
|
b = self.fp.read(1) |
|
if not b: |
|
return -1 |
|
return b[0] |
|
|
|
def read_2c(self) -> int: |
|
"""Read 2-byte big-endian integer""" |
|
return struct.unpack('>H', self.read_bytes(2))[0] |
|
|
|
def read_4c(self) -> int: |
|
"""Read 4-byte big-endian integer""" |
|
return struct.unpack('>I', self.read_bytes(4))[0] |
|
|
|
def read_time(self) -> int: |
|
"""Read 8-byte time value""" |
|
return struct.unpack('>Q', self.read_bytes(8))[0] |
|
|
|
def read_string(self, length: int) -> str: |
|
"""Read a string of specified length""" |
|
if length == 0: |
|
return "" |
|
data = self.read_bytes(length) |
|
return data.decode('utf-8', errors='replace') |
|
|
|
def read_pos(self): |
|
"""Read a position (lnum, col, coladd)""" |
|
lnum = self.read_4c() |
|
col = self.read_4c() |
|
coladd = self.read_4c() |
|
return (lnum, col, coladd) |
|
|
|
def parse(self): |
|
"""Parse the undo file""" |
|
with open(self.filepath, 'rb') as f: |
|
self.fp = f |
|
|
|
# Read and verify magic |
|
magic = self.read_bytes(UF_START_MAGIC_LEN) |
|
if magic != UF_START_MAGIC: |
|
raise ValueError("Not a valid Neovim undo file") |
|
|
|
# Read and verify version |
|
version = self.read_2c() |
|
if version != UF_VERSION: |
|
raise ValueError(f"Unsupported undo file version: {version}") |
|
|
|
# Read hash |
|
hash_bytes = self.read_bytes(UNDO_HASH_SIZE) |
|
|
|
# Read line count |
|
self.line_count = self.read_4c() |
|
if self.debug: |
|
print(f"Line count: {self.line_count}") |
|
|
|
# Read U line info |
|
u_line_len = self.read_4c() |
|
if u_line_len > 0: |
|
u_line = self.read_string(u_line_len) |
|
u_line_lnum = self.read_4c() |
|
u_line_colnr = self.read_4c() |
|
|
|
# Read header sequence numbers |
|
old_header_seq = self.read_4c() |
|
new_header_seq = self.read_4c() |
|
cur_header_seq = self.read_4c() |
|
num_head = self.read_4c() |
|
seq_last = self.read_4c() |
|
seq_cur = self.read_4c() |
|
seq_time = self.read_time() |
|
|
|
if self.debug: |
|
print(f"Number of headers: {num_head}") |
|
print(f"Current seq: {seq_cur}, Last seq: {seq_last}") |
|
|
|
# Read optional fields |
|
# Note: The length field behavior is tricky in Neovim |
|
# For known fields, length doesn't include the data |
|
# For unknown fields, we skip (length - 1) bytes after reading type |
|
while True: |
|
opt_len = self.read_byte() |
|
if opt_len == 0 or opt_len == -1: |
|
break |
|
opt_type = self.read_byte() |
|
if opt_type == UF_LAST_SAVE_NR: |
|
# For this known field, always read 4 bytes |
|
last_save_nr = self.read_4c() |
|
else: |
|
# For unknown fields, skip the remaining bytes |
|
# len was already decremented in the C code before the loop |
|
for _ in range(opt_len - 1): |
|
self.read_byte() |
|
|
|
# Read all headers |
|
headers_read = 0 |
|
while headers_read < num_head: |
|
try: |
|
magic = self.read_2c() |
|
except EOFError: |
|
if self.debug: |
|
print(f"EOF after reading {headers_read} headers") |
|
break |
|
|
|
if magic == UF_HEADER_END_MAGIC: |
|
# End of headers |
|
break |
|
|
|
if magic != UF_HEADER_MAGIC: |
|
if self.debug: |
|
print(f"Warning: Expected header magic {hex(UF_HEADER_MAGIC)}, got {hex(magic)}") |
|
# Try to continue |
|
break |
|
|
|
header = self.read_header(headers_read, num_head) |
|
if header: |
|
self.headers.append(header) |
|
headers_read += 1 |
|
|
|
if self.debug: |
|
print(f"Read {len(self.headers)} headers successfully") |
|
|
|
def read_header(self, index: int, total: int) -> Optional[UndoHeader]: |
|
"""Read a single undo header""" |
|
if self.debug: |
|
print(f"Reading header {index+1}/{total}") |
|
|
|
# Read header pointers (as sequence numbers) |
|
next_seq = self.read_4c() |
|
prev_seq = self.read_4c() |
|
alt_next_seq = self.read_4c() |
|
alt_prev_seq = self.read_4c() |
|
seq = self.read_4c() |
|
|
|
if self.debug: |
|
print(f" Sequence: {seq}") |
|
|
|
# Read cursor position |
|
cursor_pos = self.read_pos() |
|
cursor_vcol = self.read_4c() |
|
flags = self.read_2c() |
|
|
|
# Note: After the 2-byte flags field, there's NO padding in the file format |
|
|
|
# Read marks (26 marks, 12 bytes each) |
|
for _ in range(26): |
|
self.read_pos() |
|
|
|
# Read visual info |
|
# vi_start (12 bytes), vi_end (12 bytes), vi_mode (4), vi_curswant (4) = 32 bytes total |
|
vi_start = self.read_pos() |
|
vi_end = self.read_pos() |
|
vi_mode = self.read_4c() |
|
vi_curswant = self.read_4c() |
|
|
|
# Read time |
|
if self.debug: |
|
print(f" Position before time: {hex(self.fp.tell())}") |
|
time = self.read_time() |
|
if self.debug: |
|
print(f" Position after time: {hex(self.fp.tell())}") |
|
|
|
# Read optional fields (same tricky behavior as main header) |
|
save_nr = 0 |
|
while True: |
|
opt_len = self.read_byte() |
|
if self.debug: |
|
print(f" Optional field len: {opt_len}, pos: {hex(self.fp.tell())}") |
|
if opt_len == 0 or opt_len == -1: |
|
break |
|
opt_type = self.read_byte() |
|
if self.debug: |
|
print(f" Optional field type: {opt_type}") |
|
if opt_type == UHP_SAVE_NR: |
|
# For this known field, always read 4 bytes |
|
save_nr = self.read_4c() |
|
if self.debug: |
|
print(f" Save nr: {save_nr}") |
|
else: |
|
# For unknown fields, skip the remaining bytes |
|
for _ in range(opt_len - 1): |
|
self.read_byte() |
|
if self.debug: |
|
print(f" Position after optional fields: {hex(self.fp.tell())}") |
|
|
|
# Read entries |
|
entries = [] |
|
while True: |
|
magic = self.read_2c() |
|
if self.debug: |
|
print(f" Entry/end magic: {hex(magic)}") |
|
if magic == UF_ENTRY_END_MAGIC: |
|
if self.debug: |
|
print(f" Found entry end magic") |
|
break |
|
if magic != UF_ENTRY_MAGIC: |
|
# Something else - could be end of entries but wrong magic |
|
# Just skip the unknown value and continue |
|
if self.debug: |
|
print(f" Unknown magic {hex(magic)}, continuing to check for extmarks") |
|
# Don't seek back - this might be corrupt data before extmarks |
|
break |
|
|
|
entry = self.read_entry() |
|
if entry: |
|
entries.append(entry) |
|
|
|
# Skip extmarks section |
|
# After entries, there might be extmarks (also with ENTRY_MAGIC) |
|
# ended by another ENTRY_END_MAGIC |
|
try: |
|
extmark_count = 0 |
|
while True: |
|
magic = self.read_2c() |
|
if self.debug and extmark_count == 0: |
|
print(f" First extmark section magic: {hex(magic)}") |
|
|
|
if magic == UF_ENTRY_END_MAGIC: |
|
if self.debug: |
|
print(f" Found extmark end magic after {extmark_count} extmarks") |
|
break |
|
elif magic == UF_ENTRY_MAGIC: |
|
extmark_count += 1 |
|
# Read extmark type |
|
extmark_type = self.read_4c() |
|
if self.debug and extmark_count == 1: |
|
print(f" First extmark type: {extmark_type}") |
|
# Skip the extmark data based on type |
|
# kExtmarkSplice = 0, kExtmarkMove = 1 |
|
# Both have 9 fields: mix of int (4), colnr_T (4), bcount_t (8) |
|
# ExtmarkSplice/Move ≈ 44 bytes, but the serialization writes raw bytes |
|
# Let's read the actual number based on the C code |
|
if extmark_type in [0, 1]: # kExtmarkSplice or kExtmarkMove |
|
# The data is written as raw bytes of the struct |
|
# Based on analysis: 48 bytes for these types |
|
self.fp.seek(48, 1) |
|
else: |
|
# Unknown extmark type, skip less to be safe |
|
if self.debug: |
|
print(f" Unknown extmark type {extmark_type}") |
|
self.fp.seek(20, 1) |
|
else: |
|
# Not an extmark, we're done |
|
if self.debug: |
|
print(f" No more extmarks (found {hex(magic)})") |
|
self.fp.seek(-2, 1) |
|
break |
|
except EOFError: |
|
# End of file while reading extmarks |
|
if self.debug: |
|
print(f" EOF in extmark section") |
|
pass |
|
|
|
return UndoHeader(seq, entries) |
|
|
|
def read_entry(self) -> Optional[UndoEntry]: |
|
"""Read a single undo entry""" |
|
top = self.read_4c() |
|
bot = self.read_4c() |
|
lcount = self.read_4c() |
|
size = self.read_4c() |
|
|
|
if self.debug: |
|
print(f" Entry: top={top}, bot={bot}, lcount={lcount}, size={size}") |
|
|
|
lines = [] |
|
for i in range(size): |
|
line_len = self.read_4c() |
|
if line_len < 0: |
|
raise ValueError(f"Invalid line length: {line_len}") |
|
line = self.read_string(line_len) |
|
lines.append(line) |
|
|
|
return UndoEntry(top, bot, lcount, size, lines) |
|
|
|
def apply_changes(self, base_lines: List[str], header: UndoHeader) -> List[str]: |
|
"""Apply the changes from an undo header to get the state after that undo""" |
|
lines = base_lines.copy() |
|
|
|
for entry in header.entries: |
|
# Neovim uses 1-based line numbers, with 0 meaning start/end |
|
# Convert to 0-based Python indices |
|
if entry.top == 0: |
|
start = 0 |
|
else: |
|
start = entry.top - 1 |
|
|
|
if entry.bot == 0: |
|
end = len(lines) |
|
else: |
|
end = entry.bot - 1 |
|
|
|
# Delete the old lines and insert the new ones |
|
del lines[start:end] |
|
for i, line in enumerate(entry.lines): |
|
lines.insert(start + i, line) |
|
|
|
return lines |
|
|
|
def get_file_at_seq(self, seq: int, initial_lines: List[str] = None) -> List[str]: |
|
"""Get file content at a specific sequence""" |
|
if initial_lines is None: |
|
initial_lines = [] |
|
|
|
# Start with the initial state |
|
lines = initial_lines.copy() |
|
|
|
# Apply each change in sequence order |
|
for header in sorted(self.headers, key=lambda h: h.seq): |
|
if header.seq <= seq: |
|
lines = self.apply_changes(lines, header) |
|
else: |
|
break |
|
|
|
return lines |
|
|
|
def print_tree(self): |
|
"""Print undo tree information""" |
|
print(f"\nUndo file: {self.filepath}") |
|
print(f"Line count: {self.line_count}") |
|
print(f"Number of undo headers: {len(self.headers)}") |
|
print("\nUndo headers:") |
|
print("-" * 60) |
|
|
|
for header in sorted(self.headers, key=lambda h: h.seq): |
|
print(f"\nSequence {header.seq}:") |
|
print(f" Entries: {len(header.entries)}") |
|
for i, entry in enumerate(header.entries): |
|
print(f" Entry {i+1}:") |
|
print(f" Lines {entry.top}-{entry.bot}") |
|
print(f" Replaced {entry.bot - entry.top if entry.bot > 0 else 'all'} lines with {entry.size} lines") |
|
if entry.size <= 3: |
|
for line in entry.lines: |
|
preview = line[:60] + ('...' if len(line) > 60 else '') |
|
print(f" {repr(preview)}") |
|
|
|
def main(): |
|
parser = argparse.ArgumentParser(description='View Neovim undo file contents') |
|
parser.add_argument('undofile', help='Path to the Neovim undo file') |
|
parser.add_argument('--original', help='Path to the original file (for initial state)') |
|
parser.add_argument('--seq', type=int, help='Show file at specific sequence') |
|
parser.add_argument('--last', action='store_true', help='Show file at last sequence') |
|
parser.add_argument('--all', action='store_true', help='Show file at all sequences') |
|
parser.add_argument('--tree', action='store_true', help='Show undo tree structure') |
|
parser.add_argument('--debug', action='store_true', help='Enable debug output') |
|
|
|
args = parser.parse_args() |
|
|
|
try: |
|
reader = UndoFileReader(args.undofile) |
|
reader.debug = args.debug |
|
reader.parse() |
|
|
|
# Load original file if provided |
|
initial_lines = [] |
|
if args.original: |
|
try: |
|
with open(args.original, 'r') as f: |
|
initial_lines = [line.rstrip('\n\r') for line in f] |
|
if args.debug: |
|
print(f"Loaded {len(initial_lines)} lines from original file") |
|
except Exception as e: |
|
print(f"Warning: Could not read original file: {e}", file=sys.stderr) |
|
|
|
# Show tree if requested |
|
if args.tree: |
|
reader.print_tree() |
|
|
|
# Show specific sequence |
|
if args.seq is not None: |
|
lines = reader.get_file_at_seq(args.seq, initial_lines) |
|
for line in lines: |
|
print(line) |
|
|
|
# Show last sequence |
|
if args.last: |
|
if reader.headers: |
|
last_seq = max(h.seq for h in reader.headers) |
|
lines = reader.get_file_at_seq(last_seq, initial_lines) |
|
for line in lines: |
|
print(line) |
|
else: |
|
print("No undo headers found", file=sys.stderr) |
|
return 1 |
|
|
|
# Show all sequences |
|
if args.all: |
|
for header in sorted(reader.headers, key=lambda h: h.seq): |
|
print(f"\n{'='*60}") |
|
print(f"File at sequence {header.seq}:") |
|
print('='*60) |
|
lines = reader.get_file_at_seq(header.seq, initial_lines) |
|
if not lines and not initial_lines: |
|
print(" (empty or no original file provided)") |
|
else: |
|
for i, line in enumerate(lines, 1): |
|
print(f"{i:4}: {line}") |
|
|
|
except EOFError as e: |
|
print(f"Error: Unexpected end of file - {e}", file=sys.stderr) |
|
if args.debug: |
|
import traceback |
|
traceback.print_exc() |
|
return 1 |
|
except Exception as e: |
|
print(f"Error: {e}", file=sys.stderr) |
|
if args.debug: |
|
import traceback |
|
traceback.print_exc() |
|
return 1 |
|
|
|
return 0 |
|
|
|
if __name__ == '__main__': |
|
sys.exit(main()) |