Skip to content

Instantly share code, notes, and snippets.

@tianhuil
Last active November 12, 2019 19:02
Show Gist options
  • Save tianhuil/d10d4ad25718b6946a0e8d645a754b4c to your computer and use it in GitHub Desktop.
Save tianhuil/d10d4ad25718b6946a0e8d645a754b4c to your computer and use it in GitHub Desktop.
Data Sharding (useful preprocessing for dask)
import gzip
import os
from itertools import islice
import argparse
# from https://stackoverflow.com/a/41333436/8930600
def grouper(iterable, n):
iterator = iter(iterable)
while True:
group = tuple(islice(iterator, n))
if not group:
return
yield group
def records_iter(filename, limit=None):
with gzip.open(filename) as fh:
for i, line in enumerate(islice(fh, limit)):
yield line
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("filename", help="input filename")
parser.add_argument("-l", "--limit", type=float, default=None,
help="limit the number of records processed (default None)")
parser.add_argument("-s", "--shard-size", type=float, default=int(1e6),
help="shard size (default 1e6)")
args = parser.parse_args()
filename = './crawler/data/liuyan-19-11-11--08-06-32.jlgz'
output_dir, ext = os.path.splitext(args.filename)
limit = int(args.limit) if args.limit else None
def output_filename(k):
return os.path.join(output_dir, '{}{}'.format(k, ext))
try:
os.mkdir(output_dir)
except FileExistsError:
pass
lines = records_iter(args.filename, limit=limit)
for k, shard in enumerate(grouper(lines, int(args.shard_size))):
with gzip.open(output_filename(k), 'w') as fh:
for line in shard:
fh.write(line)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment