Last active
August 29, 2015 14:26
-
-
Save cpcloud/1bd5f44087487f28ed40 to your computer and use it in GitHub Desktop.
Do vs Bag + Do
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Dask version of | |
https://hdfgroup.org/wp/2015/04/putting-some-spark-into-hdf-eos/ | |
""" | |
from __future__ import print_function, division | |
import os | |
import glob | |
import time | |
import re | |
import csv | |
from contextlib import contextmanager | |
import h5py | |
import numpy as np | |
import dask.bag as db | |
from dask.array.chunk import topk | |
from dask import do | |
from toolz import memoize, concat | |
from toolz.compatibility import map, zip | |
@contextmanager | |
def data(filename): | |
with h5py.File(filename, mode='r') as f: | |
dset = f['/HDFEOS/GRIDS/NCEP/Data Fields/Tair_2m'] | |
x = dset[:, :] | |
fill = dset.attrs['_FillValue'][0] | |
yield x[x != fill] | |
@memoize | |
def date_from_filename(filename): | |
return re.match(r'^GSSTF_NCEP\.3\.(\d{4}\.\d{2}\.\d{2})\.he5(?:\.\d)?$', | |
filename).group(1).replace('.', '-') | |
def summarize(filename): | |
with data(filename) as v: | |
return [(date_from_filename(os.path.basename(filename)), | |
len(v), np.mean(v), np.median(v), np.std(v))] | |
def top10(filename): | |
date = date_from_filename(os.path.basename(filename)) | |
with data(filename) as v: | |
top, bottom = topk(10, v), -topk(10, -v) | |
assert len(top) == len(bottom), 'length of top and bottom not equal' | |
return zip([date] * len(top), top, bottom) | |
@do | |
def store(data, name, header=None): | |
if header is None: | |
header = [] | |
with open('csv/%s.csv' % name, mode='w') as f: | |
writer = csv.writer(f) | |
writer.writerow(header) | |
writer.writerows(concat(data)) | |
def doit(files, total_size): | |
start = time.time() | |
hc = store(map(do(top10), files), | |
'hotcold', | |
header=('date', 'hot', 'cold')).compute() | |
stop = time.time() | |
print('top/bottomk of %d files total size %.2f GB took %.2f seconds' % | |
(len(files), total_size / 1e9, stop - start)) | |
start = time.time() | |
sm = store(map(do(summarize), files), | |
'summary', | |
header=('date', 'len', 'mean', 'median', 'std')).compute() | |
stop = time.time() | |
print(' summary of %d files total size %.2f GB took %.2f seconds' % | |
(len(files), total_size / 1e9, stop - start)) | |
return hc, sm | |
def bagit(files, total_size): | |
bag = db.from_sequence(files) | |
hc_dag = store(bag.map(top10), 'hotcold', header=('date', 'hot', 'cold')) | |
start = time.time() | |
hc = hc_dag.compute() | |
stop = time.time() | |
print('top/bottomk of %d files total size %.2f GB took %.2f seconds' % | |
(len(files), total_size / 1e9, stop - start)) | |
sm_dag = store(bag.map(summarize), | |
'summary', | |
header=('date', 'len', 'mean', 'median', 'std')) | |
start = time.time() | |
sm = sm_dag.compute() | |
stop = time.time() | |
print(' summary of %d files total size %.2f GB took %.2f seconds' % | |
(len(files), total_size / 1e9, stop - start)) | |
return hc, sm | |
if __name__ == '__main__': | |
official_sizes = (16631632,) | |
files = [f for f in glob.glob('raw/*.he5') + glob.glob('raw/*.he5.?') | |
if os.path.getsize(f) in official_sizes] | |
total_size = sum(map(os.path.getsize, files)) | |
print('do') | |
print('==') | |
doit(files, total_size) | |
print() | |
print('bag + do') | |
print('========') | |
bagit(files, total_size) | |
print() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment