Last active
June 27, 2016 23:22
-
-
Save kmosher/2580d4f9257056786f29174c4c75da34 to your computer and use it in GitHub Desktop.
Turn a pg_dumpall file into a directory structure.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Turns the output of pg_dumpall into a directory structure. The | |
format is intended to be suitable for committing to git. | |
The original pg_dumpall file can be reconstructed easily from the | |
TABLE_OF_CONTENTS file like so: | |
cd DUMP_DIR; cat $( <TABLE_OF_CONTENTS ) | |
CAVEATS: | |
- Whatever directory you point the output at will have its | |
contents deleted without warning. | |
- Git is not a great file store for large data sets | |
Tested ad-hoc with python2.7 and 3.4. | |
Copyright 2016 Yelp Inc | |
Licensed under Apache License v2.0: http://www.apache.org/licenses/LICENSE-2.0 | |
""" | |
import argparse | |
import io | |
import os | |
import re | |
import sys | |
FILE_SIZE_LIMIT = 51380224 # 49 MiB | |
INFO_LINE_RE = re.compile(r'-- (Data for )?Name:[^;]*; Type:[^;]*; Schema[^;]*; Owner[^*]') | |
ALTER_TABLE_RE = re.compile(r'ALTER TABLE ONLY (?P<table>.*?) ALTER COLUMN (?P<column>.*?) SET DEFAULT') | |
def is_comment(line): | |
return line.startswith(b'--') | |
def strip_comment(line): | |
return line[3:] | |
def normalize_name(name): | |
name = name.replace(' ', '_') | |
name = re.sub(r'[;]', '__', name) | |
return re.sub(r'[^\w().\-]', '', name) | |
def parse_info_comment(line): | |
if not INFO_LINE_RE.match(line): | |
return strip_comment(line), None | |
fields = line.split(';') | |
name = fields[0].split(':')[1].strip() | |
type_ = fields[1].split(':')[1].strip() | |
return name, type_ | |
def name_from_comment(line): | |
line = line.decode('utf-8').strip() | |
name, type_ = parse_info_comment(line) | |
name = normalize_name(name) | |
if type_ is None: | |
return name | |
else: | |
type_ = pluralize(normalize_name(type_.lower())) | |
return os.path.join(type_, name) | |
def pluralize(word): | |
if word == 'index': | |
return 'indices' | |
elif word.endswith('data'): | |
return word | |
elif word.endswith('y'): | |
return word[:-1] + 'ies' | |
else: | |
return word + 's' | |
# TODO: Warn if dir doesn't have a TOC file | |
# (i.e. we might be deleting something we didn't write) | |
def initialize_dir(dir): | |
if not os.path.exists(dir): | |
os.mkdir(dir) | |
if not os.path.isdir(dir): | |
print("%s is not a directory!" % dir) | |
sys.exit(1) | |
for root, dirs, files in os.walk(dir, topdown=False): | |
for f in files: | |
if not f.startswith('.'): | |
os.remove(os.path.join(root, f)) | |
if root != dir: | |
try: | |
os.rmdir(root) | |
except OSError: | |
pass | |
os.chdir(dir) | |
class LineCountingFile(object): | |
def __init__(self, file_): | |
self._file = file_ | |
self.line_number = 0 | |
def __getattr__(self, attr): | |
return getattr(self._file, attr) | |
def __iter__(self): | |
return self | |
def next(self): | |
return self.__next__() | |
def __next__(self): | |
self.line_number += 1 | |
return next(self._file) | |
class SQLDumpSplitter(object): | |
def __init__(self, dump_file, out_dir): | |
self.dump_file = LineCountingFile(dump_file) | |
initialize_dir(out_dir) | |
self.current_filename = None | |
self.current_database = None | |
self.write_buffer = io.BytesIO() | |
self.should_flush = False | |
self.table_of_contents = open("TABLE_OF_CONTENTS", 'w') | |
def error(self, msg): | |
print("Error at line %d: %s" % (self.dump_file.line_number, msg)) | |
sys.exit(1) | |
def update_filename(self, name): | |
if self.current_database is not None: | |
self.current_filename = os.path.join(self.current_database, name) | |
else: | |
self.current_filename = name | |
def name_current_file(self, part_count=None): | |
if part_count is None: | |
proposal = self.current_filename + ".sql" | |
else: | |
proposal = self.current_filename + "-part%d.sql" % part_count | |
if not os.path.exists(proposal): | |
return proposal | |
elif part_count is not None: | |
self.error( | |
"Tried to write the same part (%d) twice in a multi-part file" | |
% part_count) | |
iteration = 1 | |
while os.path.exists(proposal): | |
proposal = self.current_filename + "(%d).sql" % iteration | |
iteration += 1 | |
return proposal | |
def open_current_file(self, part_count=None): | |
filename = self.name_current_file(part_count=part_count) | |
self.table_of_contents.write(filename + "\n") | |
dirname = os.path.dirname(filename) | |
if dirname and not os.path.exists(dirname): | |
os.makedirs(dirname) | |
return open(filename, 'wb') | |
def handle_new_db(self, line): | |
self.end_block(force_write=True) | |
self.current_database = re.sub(r'^\\connect ', '', line.decode('utf-8').strip()) | |
print('Splitting DB "%s"' % self.current_database) | |
self.start_block("PostgreSQL_connect") | |
self._write(line) | |
def handle_comment_block(self, line): | |
self.end_block() | |
self._write(line) | |
line = next(self.dump_file) | |
if not line.startswith(b'--'): | |
self.error("Expected line to be a comment") | |
self.start_block(name_from_comment(line)) | |
self._write(line) | |
line = next(self.dump_file) | |
if line.rstrip() != b'--': | |
self.error("Expected an empty comment line") | |
self._write(line) | |
def start_block(self, name): | |
self.update_filename(name) | |
def end_block(self, force_write=False): | |
if not self.should_flush and not force_write: | |
return | |
if not self.current_filename: | |
self.error("Found data before a comment block.") | |
self._write_file() | |
def finalize(self): | |
self.end_block(force_write=True) | |
self.table_of_contents.close() | |
def _write(self, str): | |
self.write_buffer.write(str) | |
if not is_comment(str) and str.strip(): | |
self.should_flush = True | |
def _write_file(self): | |
if self.write_buffer.tell() >= FILE_SIZE_LIMIT: | |
self._write_parts() | |
else: | |
with self.open_current_file() as f: | |
f.write(self.write_buffer.getvalue()) | |
self.write_buffer.close() | |
self.write_buffer = io.BytesIO() | |
self.should_flush = False | |
def _write_parts(self): | |
part_count = 0 | |
self.write_buffer.seek(0) | |
segment = self.write_buffer.read(FILE_SIZE_LIMIT) | |
while segment: | |
segment = self.backtrack_to_newline(segment) | |
with self.open_current_file(part_count=part_count) as f: | |
f.write(segment) | |
part_count += 1 | |
segment = self.write_buffer.read(FILE_SIZE_LIMIT) | |
def backtrack_to_newline(self, segment): | |
last_newline = segment.rfind(b'\n') + 1 | |
if last_newline and last_newline != len(segment): | |
backtrack_len = len(segment) - last_newline | |
self.write_buffer.seek(self.write_buffer.tell() - backtrack_len) | |
return segment[:-backtrack_len] | |
else: | |
return segment | |
def handle_next_line(self): | |
line = next(self.dump_file) | |
if line.rstrip() == b'--': | |
self.handle_comment_block(line) | |
elif line.startswith(b'\connect'): | |
self.handle_new_db(line) | |
else: | |
# The comment for DEFAULT types only has the column name, so we make an | |
# exception and get our hands dirty parsing sql so there's not 100 | |
# duplicated files all called 'id' | |
match = ALTER_TABLE_RE.match(line.decode('utf-8')) | |
if match: | |
name = normalize_name('%s-%s' % (match.group('table'), match.group('column'))) | |
self.update_filename(os.path.join('defaults', name)) | |
self._write(line) | |
def split(self): | |
try: | |
while True: | |
self.handle_next_line() | |
except StopIteration: | |
self.finalize() | |
print("Finished splitting dump file") | |
def parse_args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
'-d', '--out-dir', | |
default=os.path.join(os.getcwd(), 'databases') | |
) | |
parser.add_argument( | |
'pg_dumpall_file', nargs='?', | |
type=argparse.FileType('rb'), | |
default=sys.stdin | |
) | |
return parser.parse_args() | |
def main(): | |
args = parse_args() | |
if args.pg_dumpall_file.isatty(): | |
print("Paste the pg_dumpall output followed by an EOF") | |
splitter = SQLDumpSplitter( | |
args.pg_dumpall_file, | |
os.path.realpath(args.out_dir)) | |
splitter.split() | |
return 0 | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment