Created
June 13, 2013 23:57
-
-
Save pomack/5778441 to your computer and use it in GitHub Desktop.
Convert a MongoDB BSON file to a python file that can be imported.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
import bson | |
import datetime | |
import struct | |
import sys | |
INDENT_SPACES = ' ' | |
def read_bson_file(file, as_class=dict, tz_aware=True, uuid_subtype=bson.OLD_UUID_SUBTYPE): | |
try: | |
while True: | |
obj_size_bytes = file.read(4) | |
if len(obj_size_bytes) < 4: | |
break | |
obj_size = struct.unpack("<i", obj_size_bytes)[0] | |
data = file.read(obj_size - 4) | |
obj_size -= 4 | |
if len(data) < obj_size: | |
raise bson.InvalidBSON("objsize too large") | |
if len(data) != obj_size: | |
raise bson.InvalidBSON("objsize incorrect") | |
if data[-1] != bson.ZERO: | |
raise bson.InvalidBSON("bad eoo") | |
elements = data[:-1] | |
yield bson._elements_to_dict(elements, as_class=as_class, tz_aware=tz_aware, uuid_subtype=uuid_subtype) | |
except IOError, e: | |
pass | |
return | |
def read_bson_filename(input_filename): | |
with open(input_filename, 'rb') as f: | |
for d in read_bson_file(f): | |
yield d | |
def dump_value_to_python(fd, v, depth=0): | |
if v is None: | |
fd.write('None') | |
elif isinstance(v, basestring): | |
fd.write(repr(v).encode('utf-8')) | |
elif isinstance(v, (int, float, long)): | |
fd.write(repr(v)) | |
elif isinstance(v, (list, tuple, set)): | |
dump_list_to_python(fd, v, depth=depth+1) | |
elif isinstance(v, dict): | |
dump_dict_to_python(fd, v, depth=depth+1) | |
elif isinstance(v, datetime.datetime): | |
v = datetime.datetime(v.year, v.month, v.day, v.hour, v.minute, v.second, v.microsecond) | |
fd.write(repr(v)) | |
elif isinstance(v, bson.objectid.ObjectId): | |
fd.write('bson.objectid.ObjectId("') | |
fd.write(str(v)) | |
fd.write('")') | |
elif isinstance(v, bson.dbref.DBRef): | |
fd.write('bson.dbref.DBRef("') | |
fd.write(str(v.collection)) | |
fd.write('", bson.objectid.ObjectId("') | |
fd.write(str(v.id)) | |
fd.write('"))') | |
else: | |
sys.stderr.write('Unknown type: %r\n' % v) | |
sys.stderr.flush() | |
sys.exit(1) | |
def dump_list_to_python(fd, arr, depth=0): | |
indent = INDENT_SPACES * depth | |
fd.write('[') | |
if len(arr) > 1: | |
for v in arr: | |
fd.write('\n') | |
fd.write(indent) | |
fd.write(INDENT_SPACES) | |
dump_value_to_python(fd, v, depth=depth) | |
fd.write(',') | |
fd.write('\n') | |
fd.write(indent) | |
else: | |
for v in arr: | |
dump_value_to_python(fd, v, depth=depth) | |
fd.write(']') | |
def dump_dict_to_python(fd, d, depth=0): | |
indent = INDENT_SPACES * depth | |
fd.write('{') | |
if len(d) > 1: | |
for k, v in d.iteritems(): | |
fd.write('\n') | |
fd.write(indent) | |
fd.write(INDENT_SPACES) | |
dump_value_to_python(fd, k, depth=depth) | |
fd.write(': ') | |
dump_value_to_python(fd, v, depth=depth) | |
fd.write(',') | |
fd.write('\n') | |
fd.write(indent) | |
else: | |
for k, v in d.iteritems(): | |
dump_value_to_python(fd, k, depth=depth) | |
fd.write(': ') | |
dump_value_to_python(fd, v, depth=depth) | |
fd.write('}') | |
def dump_bson_to_python(input_filename, output_file): | |
output_file.write('import bson\n') | |
output_file.write('import datetime\n') | |
output_file.write('import json\n') | |
#output_file.write('import tz\n') | |
output_file.write('\n') | |
output_file.write('items = [\n') | |
for d in read_bson_filename(input_filename): | |
output_file.write(INDENT_SPACES) | |
dump_dict_to_python(output_file, d, depth=1) | |
output_file.write(',\n') | |
output_file.write(']\n') | |
def parse_args(argv=sys.argv[1:]): | |
parser = argparse.ArgumentParser(description='Dump a BSON file to an importable Python file') | |
parser.add_argument('--input', metavar='FILE', required=True, help='Input BSON filename') | |
parser.add_argument('--output', metavar='FILE', help='Output python filename') | |
options = parser.parse_args(argv) | |
return options.input, options.output | |
def main(argv=sys.argv[1:]): | |
input_filename, output_filename = parse_args(argv) | |
if output_filename: | |
with open(output_filename, 'w') as fd: | |
dump_bson_to_python(input_filename, fd) | |
else: | |
dump_bson_to_python(input_filename, sys.stdout) | |
return 0 | |
if __name__ == '__main__': | |
sys.exit(main(sys.argv[1:])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment