Skip to content

Instantly share code, notes, and snippets.

@cpcloud
Last active August 29, 2015 14:26
Show Gist options
  • Save cpcloud/1bd5f44087487f28ed40 to your computer and use it in GitHub Desktop.
Save cpcloud/1bd5f44087487f28ed40 to your computer and use it in GitHub Desktop.
Do vs Bag + Do
#!/usr/bin/env python
"""
Dask version of
https://hdfgroup.org/wp/2015/04/putting-some-spark-into-hdf-eos/
"""
from __future__ import print_function, division
import os
import glob
import time
import re
import csv
from contextlib import contextmanager
import h5py
import numpy as np
import dask.bag as db
from dask.array.chunk import topk
from dask import do
from toolz import memoize, concat
from toolz.compatibility import map, zip
@contextmanager
def data(filename):
with h5py.File(filename, mode='r') as f:
dset = f['/HDFEOS/GRIDS/NCEP/Data Fields/Tair_2m']
x = dset[:, :]
fill = dset.attrs['_FillValue'][0]
yield x[x != fill]
@memoize
def date_from_filename(filename):
return re.match(r'^GSSTF_NCEP\.3\.(\d{4}\.\d{2}\.\d{2})\.he5(?:\.\d)?$',
filename).group(1).replace('.', '-')
def summarize(filename):
with data(filename) as v:
return [(date_from_filename(os.path.basename(filename)),
len(v), np.mean(v), np.median(v), np.std(v))]
def top10(filename):
date = date_from_filename(os.path.basename(filename))
with data(filename) as v:
top, bottom = topk(10, v), -topk(10, -v)
assert len(top) == len(bottom), 'length of top and bottom not equal'
return zip([date] * len(top), top, bottom)
@do
def store(data, name, header=None):
if header is None:
header = []
with open('csv/%s.csv' % name, mode='w') as f:
writer = csv.writer(f)
writer.writerow(header)
writer.writerows(concat(data))
def doit(files, total_size):
start = time.time()
hc = store(map(do(top10), files),
'hotcold',
header=('date', 'hot', 'cold')).compute()
stop = time.time()
print('top/bottomk of %d files total size %.2f GB took %.2f seconds' %
(len(files), total_size / 1e9, stop - start))
start = time.time()
sm = store(map(do(summarize), files),
'summary',
header=('date', 'len', 'mean', 'median', 'std')).compute()
stop = time.time()
print(' summary of %d files total size %.2f GB took %.2f seconds' %
(len(files), total_size / 1e9, stop - start))
return hc, sm
def bagit(files, total_size):
bag = db.from_sequence(files)
hc_dag = store(bag.map(top10), 'hotcold', header=('date', 'hot', 'cold'))
start = time.time()
hc = hc_dag.compute()
stop = time.time()
print('top/bottomk of %d files total size %.2f GB took %.2f seconds' %
(len(files), total_size / 1e9, stop - start))
sm_dag = store(bag.map(summarize),
'summary',
header=('date', 'len', 'mean', 'median', 'std'))
start = time.time()
sm = sm_dag.compute()
stop = time.time()
print(' summary of %d files total size %.2f GB took %.2f seconds' %
(len(files), total_size / 1e9, stop - start))
return hc, sm
if __name__ == '__main__':
official_sizes = (16631632,)
files = [f for f in glob.glob('raw/*.he5') + glob.glob('raw/*.he5.?')
if os.path.getsize(f) in official_sizes]
total_size = sum(map(os.path.getsize, files))
print('do')
print('==')
doit(files, total_size)
print()
print('bag + do')
print('========')
bagit(files, total_size)
print()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment