Last active
August 29, 2015 14:18
-
-
Save mauritsvanrees/7c754bfc8457ea329d0f to your computer and use it in GitHub Desktop.
Proof of concept for downloading and deleting blob chunks from RelStorage. Use at your own risk.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
############################################################################## | |
# | |
# Copyright (c) 2015 Zope Foundation and Contributors. | |
# All Rights Reserved. | |
# | |
# This software is subject to the provisions of the Zope Public License, | |
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. | |
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED | |
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED | |
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS | |
# FOR A PARTICULAR PURPOSE. | |
# | |
############################################################################## | |
"""RelStorage Blob download utility. | |
Use case: | |
You have a RelStorage without shared blob directory, so with the blobs | |
stored in the relational database, and want to move to RelStorage WITH | |
shared blob directory. | |
One way to do that, would be to download the blobs into a new | |
directory and then start using that as the shared blob directory. | |
This script does that. It also, optionally, removes the blob_chunks | |
and large objects from the database. Use the --delete-blobs option | |
for this. | |
An alternative for the --delete-blobs option, at least on Postgres, is: | |
1. In a psql prompt, type: `TRUNCATE blob_chunk;` to empty this table. | |
This is near instantaneous. | |
2. As postgres user (database admin) run `vacuumlo databasename` to | |
cleanup the large objects. This can take a few minutes. | |
Note: a blob may be stored in multiple blob_chunks if it is very | |
large. Restoring and deleting this is untested, so currently the | |
script stops when it encounters such a blob. | |
Sample config file: | |
<relstorage source> | |
blob-dir /some/dir/var/blobcache | |
shared-blob-dir false | |
<postgresql> | |
dsn dbname='client_plone4' user='client' host='localhost' port='5432' password='secret' | |
</postgresql> | |
</relstorage> | |
new_blob_dir must be the path to a directory that does not exist yet. | |
Otherwise the script will quit immediately. | |
""" | |
import ZConfig | |
import ZODB.blob | |
import ZODB.utils | |
import logging | |
import optparse | |
import os | |
import shutil | |
import sys | |
import tempfile | |
import time | |
from StringIO import StringIO | |
from ZODB import POSException | |
from ZODB.utils import p64 | |
from relstorage.iter import fetchmany | |
schema_xml = """ | |
<schema> | |
<import package="ZODB"/> | |
<import package="relstorage"/> | |
<section type="ZODB.storage" name="source" attribute="source" | |
required="yes" /> | |
</schema> | |
""" | |
log = logging.getLogger("blobdownload") | |
all_blobs_stmt = """ | |
SELECT zoid, tid, chunk_num | |
FROM blob_chunk | |
ORDER BY chunk_num | |
""" | |
def delete_blob_chunks(cursor, oids, options): | |
"""Delete the blob_chunks for specified object ids. | |
We use a bit of batching, but not sure if that makes sense. If | |
someone wanted to add commit locks, plus maybe a pause between | |
commits, this could be a good spot. But the delete_blobs option | |
should only be used when no one else is using the database anyway: | |
you delete blobs because your current database uses a non-shared | |
blobcache and you are about to switch to a shared blobstorage | |
directory. You should not be using the database at that point. | |
The batching does give more sense of progress, which is good for | |
your heart. | |
""" | |
oids = list(oids) | |
total = len(oids) | |
if options.dry_run: | |
log.info("Dry-run selected. Would delete %d blob_chunks from " | |
"the database." % total) | |
else: | |
log.info("Deleting %d blob_chunks from the database.", total) | |
batch = 100 | |
log.info("Using batch size of %d.", batch) | |
done = 0 | |
while oids: | |
current_batch = oids[:batch] | |
current_batch_len = len(current_batch) | |
oid_list = ','.join(str(oid) for oid in current_batch) | |
del oids[:batch] | |
if options.dry_run: | |
stmt = "SELECT zoid FROM blob_chunk WHERE zoid IN (%s)" % ( | |
oid_list) | |
cursor.execute(stmt) | |
# Fetch them to see that it works. | |
count = 0 | |
for oid in fetchmany(cursor): | |
count += 1 | |
pass | |
done += count | |
pct_complete = '%1.2f%%' % (done * 100.0 / total) | |
log.info("Would delete %4d blob chunks. | %5d (%d total) | %7s", | |
count, done, total, pct_complete) | |
else: | |
stmt = "DELETE FROM blob_chunk WHERE zoid IN (%s)" % (oid_list) | |
cursor.execute(stmt) | |
done += current_batch_len | |
pct_complete = '%1.2f%%' % (done * 100.0 / total) | |
log.info("Deleted %4d oids. | %5d (%d total) | %7s", | |
current_batch_len, done, total, pct_complete) | |
def download_blobs(other, blob_dir, options): | |
# adapted from RelStorage.copyTransactionsFrom | |
fshelper = ZODB.blob.FilesystemHelper(blob_dir) | |
fshelper.create() | |
fshelper.checkSecure() | |
if options.delete_blobs and other._is_read_only: | |
msg = "ERROR: cannot delete blobs: source storage is readonly." | |
sys.exit(msg) | |
begin_time = time.time() | |
stmt = all_blobs_stmt | |
log.info("Counting the blobs.") | |
total_blobs = 0 | |
other._lock_acquire() | |
try: | |
other._before_load() | |
cursor = other._load_cursor | |
cursor.execute(stmt) | |
for zoid, tid, chunk_num in cursor.fetchall(): | |
total_blobs += 1 | |
if chunk_num > 0: | |
msg = ("ERROR: zoid %s in tid %s has more than one chunk. " | |
"This is untested, so for safety this is " | |
"not supported yet. Stopping execution. " | |
"No blobs were downloaded or deleted." % (zoid, tid)) | |
sys.exit(msg) | |
finally: | |
other._lock_release() | |
log.info("There are %d blobs.", total_blobs) | |
num_blobs = 0 | |
oids_to_delete = set() | |
if options.limit > -1: | |
log.info("Limiting download to at most %d blobs.", options.limit) | |
stmt += " LIMIT %d" % options.limit | |
log.info("Downloading %d blobs.", min(options.limit, total_blobs)) | |
else: | |
log.info("Downloading %d blobs.", total_blobs) | |
other._lock_acquire() | |
try: | |
other._before_load() | |
cursor = other._load_cursor | |
cursor.execute(stmt) | |
for zoid, tid, chunk_num in cursor.fetchall(): | |
if options.dry_run: | |
oids_to_delete.add(zoid) | |
num_blobs += 1 | |
continue | |
# Note: this might not work well when there is another row in | |
# blob_chunk, with a different chunk_num. That can happen for | |
# really large files, 2GB on Postgres. | |
# The zoids and tids in the result row are 64-bit long | |
# integers. We need to pack them into 8-byte strings. | |
packed_zoid = p64(zoid) | |
packed_tid = p64(tid) | |
try: | |
blobfile = other.openCommittedBlobFile( | |
packed_zoid, packed_tid) | |
except POSException.POSKeyError: | |
log.warn("POSKeyError on zoid %s, tid %s", zoid, tid) | |
continue | |
if blobfile is None: | |
continue | |
num_blobs += 1 | |
fd, name = tempfile.mkstemp( | |
suffix='.tmp', | |
dir=fshelper.temp_dir) | |
os.close(fd) | |
target = open(name, 'wb') | |
ZODB.utils.cp(blobfile, target) | |
blobfile.close() | |
target.close() | |
fshelper.getPathForOID(packed_zoid, create=True) | |
targetname = fshelper.getBlobFilename(packed_zoid, packed_tid) | |
ZODB.blob.rename_or_copy_blob(name, targetname) | |
oids_to_delete.add(zoid) | |
pct_complete = '%1.2f%%' % (num_blobs * 100.0 / total_blobs) | |
log.info("Downloaded zoid %d, tid %d | %5d blobs (%d total) | %7s", | |
zoid, tid, num_blobs, total_blobs, pct_complete) | |
log.info("Done downloading to %s", blob_dir) | |
finally: | |
other._lock_release() | |
# At the end, remove the blob chunks that were correctly downloaded. | |
if not options.delete_blobs: | |
log.info("--delete-blobs option not used. " | |
"Keeping %d blob_chunks in the database." % | |
len(oids_to_delete)) | |
else: | |
other._lock_acquire() | |
try: | |
conn, cursor = other._adapter.connmanager.open() | |
other._adapter.locker.hold_commit_lock(cursor) | |
delete_blob_chunks(cursor, oids_to_delete, options) | |
conn.commit() | |
other._adapter.locker.release_commit_lock(cursor) | |
finally: | |
other._lock_release() | |
elapsed = time.time() - begin_time | |
log.info( | |
"%d (out of %d) blobs successfully downloaded in %4.1f minutes.", | |
num_blobs, total_blobs, elapsed / 60.0) | |
if total_blobs > num_blobs: | |
if options.limit > -1 and options.limit == num_blobs: | |
log.info("Blob download was limited to %d, and that number " | |
"was downloaded.", options.limit) | |
else: | |
log.warn("%d blobs were not downloaded. There may have been " | |
"problems. Problematic blobs will not have been deleted.", | |
total_blobs - num_blobs) | |
class SaneDescriptionFormatter(optparse.IndentedHelpFormatter): | |
def format_description(self, description): | |
# No, we do not want to wrap the lines, thank you... | |
return description | |
def main(argv=sys.argv): | |
parser = optparse.OptionParser(description=__doc__, | |
usage="%prog [options] config_file new_blob_dir", | |
formatter=SaneDescriptionFormatter()) | |
parser.add_option( | |
"--dry-run", dest="dry_run", action="store_true", | |
help="Attempt to open the storage, then explain what would be done") | |
parser.add_option( | |
"--clear", dest="clear", action="store_true", | |
help="Delete the destination blob directory before " | |
"downloading, to force a clean download.") | |
parser.add_option( | |
"--delete-blobs", dest="delete_blobs", action="store_true", | |
help="Delete the blobs from the source database (blob_chunk table). " | |
"Any blobs for which the download fails, will be left. " | |
"This is done at the end when all is downloaded.") | |
parser.add_option( | |
"--limit", dest="limit", action="store", type="int", | |
help="Download and/or delete at most this number of blobs. " | |
"Default: no limit (technically: -1).") | |
parser.set_defaults(dry_run=False, clear=False, delete_blobs=False, limit=-1) | |
options, args = parser.parse_args(argv[1:]) | |
if len(args) != 2: | |
parser.error("The name of a configuration file and new blob dir " | |
"is required.") | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s [%(name)s] %(levelname)s %(message)s") | |
if options.dry_run: | |
log.info("Dry-run selected.") | |
blob_dir = args[1] | |
if os.path.exists(blob_dir): | |
if options.clear: | |
if options.dry_run: | |
log.info("Dry-run selected. Would delete %s ...", blob_dir) | |
else: | |
log.info("Deleting %s ...", blob_dir) | |
shutil.rmtree(blob_dir) | |
else: | |
log.info("Adding to existing blob dir %s", blob_dir) | |
schema = ZConfig.loadSchemaFile(StringIO(schema_xml)) | |
config, handler = ZConfig.loadConfig(schema, args[0]) | |
source = config.source.open() | |
log.info("Storage opened successfully.") | |
download_blobs(source, blob_dir, options) | |
source.close() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment