Created
October 3, 2014 20:54
-
-
Save also/9f823d9eb9dc0a410796 to your computer and use it in GitHub Desktop.
split kafka log files into chunks that kafka can actually read
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# written in anger at 4AM. it's a bad idea to use this. | |
import struct | |
import sys | |
import glob | |
import os | |
source = sys.argv[1] | |
dest = sys.argv[2] | |
topic = sys.argv[3] | |
p_dirs = glob.glob('%s/%s-*' % (source, topic)) | |
for p_dir in p_dirs: | |
files = glob.glob('%s/*.log' % p_dir) | |
p_dir_basename = os.path.basename(p_dir) | |
print p_dir_basename | |
dest_dir = '%s/%s' % (dest, p_dir_basename) | |
if not os.path.isdir(dest_dir): | |
os.mkdir(dest_dir) | |
for filename in files: | |
out = None | |
with open(filename) as f: | |
while True: | |
offset_s = f.read(8) | |
size_s = f.read(4) | |
if len(offset_s) == 0: | |
print 'done' | |
break # YAY | |
if len(offset_s) != 8: | |
raise Exception('bad offset size %d' % len(size_s)) | |
if len(size_s) != 4: | |
raise Exception('bad size size %d' % len(size_s)) | |
size = struct.unpack(">i", size_s)[0] | |
offset = struct.unpack('>q', offset_s)[0] | |
#print 'size %d,\toffset %d' % (size, offset) | |
if out is None or out.tell() > 1024*1024*1024: | |
if out is not None: | |
out.close() | |
out = open('%s/%020d.log' % (dest_dir, offset), 'w') | |
out.write(offset_s) | |
out.write(size_s) | |
out.write(f.read(size)) | |
#print f.tell() | |
out.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
https://issues.apache.org/jira/browse/KAFKA-1670