Created
August 20, 2018 22:04
-
-
Save MatthewWilkes/f1231ea94188ed160634aa83a8f84dbd to your computer and use it in GitHub Desktop.
A custom ZODB recovery script for ZODB
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ZODB recovery script that 'rebases' transactions onto an older copy of the same | |
# FileStorage. This is useful in the case where a FileStorage is lightly corrupted | |
# but that is not detected and it remains in production, garnering new transactions. | |
# | |
# The corruption in the storage may have prevented various transactions for completing | |
# and prevents packing or incremental backups, so backup taken before the corruption | |
# happened is used as a graft for the newer transactions. This only works if all | |
# the corruption happens to transactions covered by the backup, corruption that affects | |
# the transactions being appended is not recoverable in this fashion. | |
# | |
# (c) Matthew Wilkes, 2018. GPLv2. | |
import math | |
import os | |
import shutil | |
import sys | |
import time | |
from ZODB.fstools import TxnHeader | |
from ZODB.FileStorage.format import FileStorageFormatter | |
from ZODB.fsIndex import fsIndex | |
from ZODB.serialize import referencesf | |
# A file that represents the newest state of the database, that may be corrupted | |
new_fs = '/mnt/zeo/blob/filestorage/Data.fs' | |
# A known good state, from an old backup | |
old_fs = '/mnt/zeo/bak/for_recovery/mwilkes/weekly_datafs_old/Data.fs.old' | |
# Output filename | |
restore_fs = '/mnt/zeo/bak/for_recovery/mwilkes/patched-20180818.fs' | |
# Last common transaction id | |
boundary_txn = '03c66e29bfc50dbb'.decode('hex') | |
SHOULD_COPY = True | |
last_progress = 0 | |
if SHOULD_COPY: | |
print "Starting initial copy" | |
# Copy the old FS entirely into the restore FS, as this | |
# section does not need any modification. We try and get | |
# the OS to handle this, to avoid userspace slowdown | |
shutil.copy(old_fs, restore_fs) | |
print "Copied old fs" | |
# Open the restore file unbuffered, as we will be | |
# doing lots of small edits and re-reads, causing the | |
# buffer to always be invalidated | |
restore_file = open(restore_fs, 'rb+', 0) | |
try: | |
if SHOULD_COPY: | |
# Seek to the end of the new file, so writes append | |
restore_file.seek(0, 2) | |
new_file = open(new_fs, 'rb') | |
try: | |
base_txn = TxnHeader(new_file, 4) | |
current = base_txn | |
reached_boundary = False | |
# Scan through transactions looking for the known good one. | |
# We could equally do this by using the last transaction, but | |
# this method allows us to pick different graft points | |
while current: | |
if current.tid == boundary_txn: | |
reached_boundary = current.next_txn()._pos | |
print "Reached boundary txn" | |
break | |
else: | |
current = current.next_txn() | |
new_file.seek(reached_boundary) | |
while True: | |
# Copy a megabyte over at a time, as that's the default | |
# buffer size | |
data = new_file.read(2**20) | |
if not data: | |
break | |
restore_file.write(data) | |
sys.stdout.write(".") | |
sys.stdout.flush() | |
finally: | |
new_file.close() | |
print "Rewriting data header pointers" | |
restore_file.seek(0, 2) | |
final_pos = restore_file.tell() | |
formatter = FileStorageFormatter() | |
formatter._file = restore_file | |
# There is a check that timestamps are increasing, populate the last | |
# seen timestamp with 0 to fix the initial case | |
formatter.ltid = 0 | |
index = fsIndex() | |
if not SHOULD_COPY: | |
# Rewrite from the start of the file | |
rewrite_offset = 4 | |
else: | |
# Rewrite from the graft point | |
rewrite_offset = reached_boundary | |
current = base_txn = TxnHeader(restore_file, rewrite_offset) | |
while current is not None: | |
# Show one . for each 1% of the file processed | |
current_progress = (current._pos / float(final_pos)) * 100.0 | |
current_progress = math.floor(current_progress) | |
if current_progress > last_progress: | |
sys.stdout.write("." * int(current_progress - last_progress)) | |
sys.stdout.flush() | |
last_progress = current_progress | |
# Verify the txn header | |
formatted_txn = formatter._read_txn_header(current._pos) | |
formatter.checkTxn(formatted_txn, current._pos) | |
# Find the current transaction boundary, so we know when we have | |
# processed all data records | |
next_txn = current._pos + current.length + 8 | |
# Rewrite the internal data headers | |
data_offset = current.get_data_offset() | |
# Continue looping over data records until we exceed the end of the | |
# transaction. The trailing 8 bytes is the length of the transaction, | |
# to allow backtracking. That is the relevant boundary. | |
while data_offset < (next_txn - 8): | |
current_data = formatter._read_data_header(data_offset) | |
# Rewrite this data header | |
current_data.tloc = current._pos | |
# Recalculate the previous pointers in this txn | |
current_data.prev = index.get(current_data.oid, 0) | |
new_header = current_data.asString() | |
# Overwrite the data header | |
restore_file.seek(data_offset) | |
restore_file.write(new_header) | |
# Check that our new header is valid | |
formatter.checkData(formatted_txn, current._pos, current_data, data_offset) | |
# Index this oid position, so we can fix future prev pointers | |
index[current_data.oid] = data_offset | |
# Advance to the next data record | |
data_offset = data_offset + current_data.recordlen() | |
# Advance to the next transaction record | |
current = current.next_txn() | |
finally: | |
restore_file.close() | |
print "Writing index" | |
fs = FileStorage(restore_fs) | |
print "Packing FileStorage" | |
fs.pack(time.time(), referencesf) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment