Skip to content

Instantly share code, notes, and snippets.

@kmosher
Last active June 27, 2016 23:22
Show Gist options
  • Save kmosher/2580d4f9257056786f29174c4c75da34 to your computer and use it in GitHub Desktop.
Save kmosher/2580d4f9257056786f29174c4c75da34 to your computer and use it in GitHub Desktop.
Turn a pg_dumpall file into a directory structure.
#!/usr/bin/env python
"""
Turns the output of pg_dumpall into a directory structure. The
format is intended to be suitable for committing to git.
The original pg_dumpall file can be reconstructed easily from the
TABLE_OF_CONTENTS file like so:
cd DUMP_DIR; cat $( <TABLE_OF_CONTENTS )
CAVEATS:
- Whatever directory you point the output at will have its
contents deleted without warning.
- Git is not a great file store for large data sets
Tested ad-hoc with python2.7 and 3.4.
Copyright 2016 Yelp Inc
Licensed under Apache License v2.0: http://www.apache.org/licenses/LICENSE-2.0
"""
import argparse
import io
import os
import re
import sys
FILE_SIZE_LIMIT = 51380224 # 49 MiB
INFO_LINE_RE = re.compile(r'-- (Data for )?Name:[^;]*; Type:[^;]*; Schema[^;]*; Owner[^*]')
ALTER_TABLE_RE = re.compile(r'ALTER TABLE ONLY (?P<table>.*?) ALTER COLUMN (?P<column>.*?) SET DEFAULT')
def is_comment(line):
return line.startswith(b'--')
def strip_comment(line):
return line[3:]
def normalize_name(name):
name = name.replace(' ', '_')
name = re.sub(r'[;]', '__', name)
return re.sub(r'[^\w().\-]', '', name)
def parse_info_comment(line):
if not INFO_LINE_RE.match(line):
return strip_comment(line), None
fields = line.split(';')
name = fields[0].split(':')[1].strip()
type_ = fields[1].split(':')[1].strip()
return name, type_
def name_from_comment(line):
line = line.decode('utf-8').strip()
name, type_ = parse_info_comment(line)
name = normalize_name(name)
if type_ is None:
return name
else:
type_ = pluralize(normalize_name(type_.lower()))
return os.path.join(type_, name)
def pluralize(word):
if word == 'index':
return 'indices'
elif word.endswith('data'):
return word
elif word.endswith('y'):
return word[:-1] + 'ies'
else:
return word + 's'
# TODO: Warn if dir doesn't have a TOC file
# (i.e. we might be deleting something we didn't write)
def initialize_dir(dir):
if not os.path.exists(dir):
os.mkdir(dir)
if not os.path.isdir(dir):
print("%s is not a directory!" % dir)
sys.exit(1)
for root, dirs, files in os.walk(dir, topdown=False):
for f in files:
if not f.startswith('.'):
os.remove(os.path.join(root, f))
if root != dir:
try:
os.rmdir(root)
except OSError:
pass
os.chdir(dir)
class LineCountingFile(object):
def __init__(self, file_):
self._file = file_
self.line_number = 0
def __getattr__(self, attr):
return getattr(self._file, attr)
def __iter__(self):
return self
def next(self):
return self.__next__()
def __next__(self):
self.line_number += 1
return next(self._file)
class SQLDumpSplitter(object):
def __init__(self, dump_file, out_dir):
self.dump_file = LineCountingFile(dump_file)
initialize_dir(out_dir)
self.current_filename = None
self.current_database = None
self.write_buffer = io.BytesIO()
self.should_flush = False
self.table_of_contents = open("TABLE_OF_CONTENTS", 'w')
def error(self, msg):
print("Error at line %d: %s" % (self.dump_file.line_number, msg))
sys.exit(1)
def update_filename(self, name):
if self.current_database is not None:
self.current_filename = os.path.join(self.current_database, name)
else:
self.current_filename = name
def name_current_file(self, part_count=None):
if part_count is None:
proposal = self.current_filename + ".sql"
else:
proposal = self.current_filename + "-part%d.sql" % part_count
if not os.path.exists(proposal):
return proposal
elif part_count is not None:
self.error(
"Tried to write the same part (%d) twice in a multi-part file"
% part_count)
iteration = 1
while os.path.exists(proposal):
proposal = self.current_filename + "(%d).sql" % iteration
iteration += 1
return proposal
def open_current_file(self, part_count=None):
filename = self.name_current_file(part_count=part_count)
self.table_of_contents.write(filename + "\n")
dirname = os.path.dirname(filename)
if dirname and not os.path.exists(dirname):
os.makedirs(dirname)
return open(filename, 'wb')
def handle_new_db(self, line):
self.end_block(force_write=True)
self.current_database = re.sub(r'^\\connect ', '', line.decode('utf-8').strip())
print('Splitting DB "%s"' % self.current_database)
self.start_block("PostgreSQL_connect")
self._write(line)
def handle_comment_block(self, line):
self.end_block()
self._write(line)
line = next(self.dump_file)
if not line.startswith(b'--'):
self.error("Expected line to be a comment")
self.start_block(name_from_comment(line))
self._write(line)
line = next(self.dump_file)
if line.rstrip() != b'--':
self.error("Expected an empty comment line")
self._write(line)
def start_block(self, name):
self.update_filename(name)
def end_block(self, force_write=False):
if not self.should_flush and not force_write:
return
if not self.current_filename:
self.error("Found data before a comment block.")
self._write_file()
def finalize(self):
self.end_block(force_write=True)
self.table_of_contents.close()
def _write(self, str):
self.write_buffer.write(str)
if not is_comment(str) and str.strip():
self.should_flush = True
def _write_file(self):
if self.write_buffer.tell() >= FILE_SIZE_LIMIT:
self._write_parts()
else:
with self.open_current_file() as f:
f.write(self.write_buffer.getvalue())
self.write_buffer.close()
self.write_buffer = io.BytesIO()
self.should_flush = False
def _write_parts(self):
part_count = 0
self.write_buffer.seek(0)
segment = self.write_buffer.read(FILE_SIZE_LIMIT)
while segment:
segment = self.backtrack_to_newline(segment)
with self.open_current_file(part_count=part_count) as f:
f.write(segment)
part_count += 1
segment = self.write_buffer.read(FILE_SIZE_LIMIT)
def backtrack_to_newline(self, segment):
last_newline = segment.rfind(b'\n') + 1
if last_newline and last_newline != len(segment):
backtrack_len = len(segment) - last_newline
self.write_buffer.seek(self.write_buffer.tell() - backtrack_len)
return segment[:-backtrack_len]
else:
return segment
def handle_next_line(self):
line = next(self.dump_file)
if line.rstrip() == b'--':
self.handle_comment_block(line)
elif line.startswith(b'\connect'):
self.handle_new_db(line)
else:
# The comment for DEFAULT types only has the column name, so we make an
# exception and get our hands dirty parsing sql so there's not 100
# duplicated files all called 'id'
match = ALTER_TABLE_RE.match(line.decode('utf-8'))
if match:
name = normalize_name('%s-%s' % (match.group('table'), match.group('column')))
self.update_filename(os.path.join('defaults', name))
self._write(line)
def split(self):
try:
while True:
self.handle_next_line()
except StopIteration:
self.finalize()
print("Finished splitting dump file")
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
'-d', '--out-dir',
default=os.path.join(os.getcwd(), 'databases')
)
parser.add_argument(
'pg_dumpall_file', nargs='?',
type=argparse.FileType('rb'),
default=sys.stdin
)
return parser.parse_args()
def main():
args = parse_args()
if args.pg_dumpall_file.isatty():
print("Paste the pg_dumpall output followed by an EOF")
splitter = SQLDumpSplitter(
args.pg_dumpall_file,
os.path.realpath(args.out_dir))
splitter.split()
return 0
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment