Skip to content

Instantly share code, notes, and snippets.

@martindurant
Created May 2, 2017 22:38
Show Gist options
  • Save martindurant/b23dcb8eb205e769707c1f58c46d13e0 to your computer and use it in GitHub Desktop.
Save martindurant/b23dcb8eb205e769707c1f58c46d13e0 to your computer and use it in GitHub Desktop.
datashader_by_hand
import dask
import dask.dataframe as dd
import numpy as np
import numba
import time
@numba.njit
def calc_hist(X, Y, xrange, yrange, out, w, h):
x0, x1 = xrange
y0, y1 = yrange
for i in range(len(X)):
indx = int((X[i]-x0)/(x1-x0)*w)
indy = int((Y[i]-y0)/(y1-y0)*h)
out[indy, indx] += 1
def hist(d, xrange, yrange, w, h):
out = np.empty((w, h), dtype='int32')
calc_hist(d.metersnorth.values, d.meterswest.values, xrange, yrange,
out, w, h)
return out
if __name__ == '__main__':
from dask import distributed
lc = distributed.LocalCluster(n_workers=8, threads_per_worker=1)
c = distributed.Client(lc)
t0 = time.time()
df = dd.read_parquet('census.parquet')
df = c.persist(df)
distributed.wait(df)
t1 = time.time()
print('Persisted', round(t1-t0, 1))
xma, xmi = c.gather(c.compute([df.meterswest.values.max(),
df.meterswest.values.min()]))
yma, ymi = c.gather(c.compute([df.metersnorth.values.max(),
df.metersnorth.values.min()]))
width = 600
height = int(900 * 7.0 / 12)
bits = [dask.delayed(hist)(d, (ymi, yma), (xmi, xma), width, height)
for d in df.to_delayed()]
out = c.gather(c.compute(bits))
agg = np.dstack(out).sum(-1)
t2 = time.time()
print('Agg1', t2-t1)
width = 3
height = int(3 * 7.0 / 12)
bits = [dask.delayed(hist)(d, (ymi, yma), (xmi, xma), width, height)
for d in df.to_delayed()]
out = c.gather(c.compute(bits))
agg = np.dstack(out).sum(-1)
t3 = time.time()
print('Agg2', t3-t2)
c.shutdown()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment