Last active
November 12, 2019 19:02
-
-
Save tianhuil/d10d4ad25718b6946a0e8d645a754b4c to your computer and use it in GitHub Desktop.
Data Sharding (useful preprocessing for dask)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gzip | |
import os | |
from itertools import islice | |
import argparse | |
# from https://stackoverflow.com/a/41333436/8930600 | |
def grouper(iterable, n): | |
iterator = iter(iterable) | |
while True: | |
group = tuple(islice(iterator, n)) | |
if not group: | |
return | |
yield group | |
def records_iter(filename, limit=None): | |
with gzip.open(filename) as fh: | |
for i, line in enumerate(islice(fh, limit)): | |
yield line | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument("filename", help="input filename") | |
parser.add_argument("-l", "--limit", type=float, default=None, | |
help="limit the number of records processed (default None)") | |
parser.add_argument("-s", "--shard-size", type=float, default=int(1e6), | |
help="shard size (default 1e6)") | |
args = parser.parse_args() | |
filename = './crawler/data/liuyan-19-11-11--08-06-32.jlgz' | |
output_dir, ext = os.path.splitext(args.filename) | |
limit = int(args.limit) if args.limit else None | |
def output_filename(k): | |
return os.path.join(output_dir, '{}{}'.format(k, ext)) | |
try: | |
os.mkdir(output_dir) | |
except FileExistsError: | |
pass | |
lines = records_iter(args.filename, limit=limit) | |
for k, shard in enumerate(grouper(lines, int(args.shard_size))): | |
with gzip.open(output_filename(k), 'w') as fh: | |
for line in shard: | |
fh.write(line) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment