Skip to content

Instantly share code, notes, and snippets.

@tritemio
Last active January 2, 2016 10:59
Show Gist options
  • Save tritemio/8293590 to your computer and use it in GitHub Desktop.
Save tritemio/8293590 to your computer and use it in GitHub Desktop.
Numpy out-of-core processing study
{
"metadata": {
"name": ""
},
"nbformat": 3,
"nbformat_minor": 0,
"worksheets": [
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#Python particles simulator: numpy out-of-core processing"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> This notebook benchmarks different solution to [this Stackoverflow question](http://stackoverflow.com/questions/20940805/python-particles-simulator-out-of-core-processing)."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from itertools import izip\n",
"import pandas as pd\n",
"import numpy as np\n",
"import tables as tb"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 1
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"print 'PyTables version:', tb.__version__\n",
"print 'Pandas version:', pd.__version__"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"PyTables version: 3.0.0\n",
"Pandas version: 0.12.0\n"
]
}
],
"prompt_number": 2
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"n_particles = 15\n",
"time_chunk_size = 2**18\n",
"n_iter = 20\n",
"time_size = n_iter*time_chunk_size\n",
"print 'time size: %.1f 10^6' % (time_size*1e-6)\n",
"print 'emission size: %.1f MB' % (4*n_particles*time_size/1024./1024.)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"time size: 5.2 10^6\n",
"emission size: 300.0 MB\n"
]
}
],
"prompt_number": 3
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##Utility Functions"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"def generate_emission(shape):\n",
" \"\"\"Generate fake emission.\"\"\"\n",
" emission = np.random.randn(*shape).astype('float32') - 1\n",
" emission.clip(0, 1e6, out=emission)\n",
" assert (emission >=0).all() \n",
" return emission"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 4
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"def compute_counts(emission):\n",
" \"\"\"Fake counts computation\"\"\"\n",
" return np.trunc(emission*1.5).astype('i1')"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 5
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##Simulate \"`emission`\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Time needed to generate the data (in-RAM storage):"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%timeit -r1 -n1\n",
"# generate simulated data\n",
"test = []\n",
"for i in range(n_iter):\n",
" emission_chunk = generate_emission((time_chunk_size, n_particles))\n",
" test.append(emission_chunk)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"1 loops, best of 1: 3.4 s per loop\n"
]
}
],
"prompt_number": 6
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Time needed to save the data to a file with **pytables**:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"#data_file.close()"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 6
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"data_file = tb.open_file('emission_pytables.hdf', mode = \"w\")"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 7
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%timeit -r1 -n1\n",
"# 1) create a new emission file, compressing as we go\n",
"comp_filter = tb.Filters(complib='blosc', complevel=5)\n",
"emission = data_file.create_earray('/', 'emission', atom=tb.Float32Atom(),\n",
" shape = (n_particles, 0),\n",
" chunkshape = (n_particles, time_chunk_size),\n",
" expectedrows = n_iter*time_chunk_size,\n",
" filters = comp_filter)\n",
"\n",
"# generate simulated emission data\n",
"for i in range(n_iter):\n",
" emission_chunk = generate_emission((n_particles, time_chunk_size))\n",
" emission.append(emission_chunk)\n",
"\n",
"emission.flush()"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"1 loops, best of 1: 12.3 s per loop\n"
]
}
],
"prompt_number": 8
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"print 'File size: %.1f MB ' % (data_file.get_filesize()/1024./1024.)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"File size: 135.1 MB \n"
]
}
],
"prompt_number": 9
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"emission = data_file.root.emission"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 10
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"print 'Compression ratio: %.1f x' % ((1.*emission.size_in_memory/emission.size_on_disk))"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"Compression ratio: 2.2 x\n"
]
}
],
"prompt_number": 11
},
{
"cell_type": "code",
"collapsed": false,
"input": [],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 11
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Time needed to save the data to a file with **pandas** ([Jeff version](http://stackoverflow.com/questions/20940805/python-particles-simulator-out-of-core-processing)):"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%timeit -r1 -n1\n",
"# 1) create a new emission file, compressing as we go\n",
"emission = pd.HDFStore('emission_pandas.hdf', mode='w', complib='blosc', complevel=5)\n",
"\n",
"# generate simulated data\n",
"for i in range(n_iter):\n",
"\n",
" # Generate fake emission\n",
" emission_chunk = np.random.randn(time_chunk_size, n_particles) - 1\n",
" emission_chunk.clip(0, 1e6, out=emission_chunk)\n",
" assert (emission_chunk >=0).all()\n",
"\n",
" df = pd.DataFrame(emission_chunk, dtype='float32')\n",
"\n",
" # create a globally unique index (time)\n",
" # http://stackoverflow.com/questions/16997048/how-does-one-append-large-\n",
" # amounts-of-data-to-a-pandas-hdfstore-and-get-a-natural/16999397#16999397\n",
" try:\n",
" nrows = emission.get_storer('df').nrows\n",
" except:\n",
" nrows = 0\n",
"\n",
" df.index = pd.Series(df.index) + nrows\n",
" emission.append('df', df)\n",
"\n",
"emission.close()"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"1 loops, best of 1: 41.7 s per loop\n"
]
}
],
"prompt_number": 12
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"#emission.close()"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 13
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##Simulate \"`timestamps`\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"###All-in-memory"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Compute the counts:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"counts_ram = compute_counts(emission.read())"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 14
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%timeit -r1 -n1\n",
"counts_ram = compute_counts(emission.read())"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"1 loops, best of 1: 31.2 s per loop\n"
]
}
],
"prompt_number": 15
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"####Compute timestamps. \n",
"\n",
"When there are more than 1 counts per bin I add a \"fraction of bin\" to the timestamp in order to avoid photon bunching. The timestamp resolution is increased 10 times so that integers can represent fractions of the initial time-bin.\n",
"\n",
"Lists are defined outside the **timeit** loop otherwise are not saved:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"fractions = [5, 2, 8, 4, 9, 1, 7, 3, 6, 9, 0, 5, 2, 8, 4, 9]\n",
"scale = 10\n",
"max_counts = 4\n",
"\n",
"t_list = [[] for i in xrange(n_particles)]\n",
"timestamps_ram = []"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 16
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%timeit -r1 -n1\n",
"for p_i, counts_p_i in enumerate(counts_ram.copy()):\n",
" t_list[p_i].append(counts_p_i.nonzero()[0]*scale)\n",
" for frac, v in izip(fractions, range(2, max_counts + 1)):\n",
" counts_p_i -= 1\n",
" np.clip(counts_p_i, 0, 127, out=counts_p_i)\n",
" t_list[p_i].append(counts_p_i.nonzero()[0]*scale + frac)\n",
"\n",
"for t in t_list: timestamps_ram.append(np.hstack(t))\n",
"[t.sort(kind='mergesort') for t in timestamps_ram]"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"1 loops, best of 1: 3.7 s per loop\n"
]
}
],
"prompt_number": 17
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Here we create a single sorted array (and an array of particles) from the list of timestamps"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"par_index = [p_i*np.ones(t.size) for p_i, t in enumerate(timestamps_ram)]\n",
"timestamps_all_ram = np.hstack(timestamps_ram)\n",
"par_index_all_ram = np.hstack(par_index)\n",
"index_sort = timestamps_all_ram.argsort(kind='mergesort')\n",
"timestamps_all_ram = timestamps_all_ram[index_sort]\n",
"par_index_all_ram = par_index_all_ram[index_sort]"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 18
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This are only consistency checks:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"print [t.shape for t in timestamps_ram]"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"[(310020,), (309226,), (308675,), (309585,), (310489,), (309850,), (310068,), (309547,), (310329,), (310280,), (309849,), (309377,), (309851,), (310024,), (308760,)]\n"
]
}
],
"prompt_number": 19
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"for p_i in xrange(n_particles):\n",
" print (timestamps_ram[p_i] == timestamps_all_ram[par_index_all_ram == p_i]).all(),"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"True "
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"True "
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"True "
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"True "
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"True "
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"True "
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"True "
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"True "
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"True "
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"True "
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"True "
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"True "
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"True "
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"True "
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"True\n"
]
}
],
"prompt_number": 20
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"idx_ram = [t/scale for t in timestamps_ram]\n",
"idx_ram"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 21,
"text": [
"[array([ 30, 86, 126, ..., 5242816, 5242857, 5242879]),\n",
" array([ 2, 36, 45, ..., 5242706, 5242726, 5242752]),\n",
" array([ 36, 36, 57, ..., 5242852, 5242853, 5242853]),\n",
" array([ 1, 1, 1, ..., 5242830, 5242842, 5242877]),\n",
" array([ 21, 33, 33, ..., 5242694, 5242849, 5242849]),\n",
" array([ 26, 26, 47, ..., 5242814, 5242835, 5242871]),\n",
" array([ 34, 36, 52, ..., 5242852, 5242852, 5242852]),\n",
" array([ 0, 10, 41, ..., 5242856, 5242879, 5242879]),\n",
" array([ 5, 33, 70, ..., 5242859, 5242870, 5242879]),\n",
" array([ 15, 17, 45, ..., 5242872, 5242872, 5242874]),\n",
" array([ 12, 45, 50, ..., 5242875, 5242875, 5242876]),\n",
" array([ 52, 52, 55, ..., 5242834, 5242834, 5242834]),\n",
" array([ 20, 44, 53, ..., 5242693, 5242804, 5242835]),\n",
" array([ 54, 61, 61, ..., 5242869, 5242876, 5242876]),\n",
" array([ 30, 52, 52, ..., 5242837, 5242845, 5242845])]"
]
}
],
"prompt_number": 21
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"print [(np.clip(c, 0, max_counts).sum() == t.size) for c, t in zip(counts_ram, timestamps_ram)]"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"[True, True, True, True, True, True, True, True, True, True, True, True, True, True, True]\n"
]
}
],
"prompt_number": 22
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"[c[i] for c, i in zip(counts_ram, idx_ram)]"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 23,
"text": [
"[array([1, 1, 1, ..., 1, 1, 1], dtype=int8),\n",
" array([1, 1, 1, ..., 1, 1, 1], dtype=int8),\n",
" array([2, 2, 1, ..., 1, 2, 2], dtype=int8),\n",
" array([3, 3, 3, ..., 1, 1, 1], dtype=int8),\n",
" array([1, 3, 3, ..., 1, 2, 2], dtype=int8),\n",
" array([2, 2, 1, ..., 1, 1, 1], dtype=int8),\n",
" array([1, 1, 2, ..., 3, 3, 3], dtype=int8),\n",
" array([1, 1, 1, ..., 1, 2, 2], dtype=int8),\n",
" array([1, 1, 1, ..., 1, 1, 1], dtype=int8),\n",
" array([1, 1, 1, ..., 2, 2, 1], dtype=int8),\n",
" array([1, 1, 1, ..., 2, 2, 1], dtype=int8),\n",
" array([2, 2, 1, ..., 3, 3, 3], dtype=int8),\n",
" array([1, 1, 1, ..., 1, 1, 1], dtype=int8),\n",
" array([1, 2, 2, ..., 1, 2, 2], dtype=int8),\n",
" array([1, 2, 2, ..., 1, 2, 2], dtype=int8)]"
]
}
],
"prompt_number": 23
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"###PyTables: store `counts`, then compute `timestamps`"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Each particle has a different table of \"counts\".\n",
"\n",
"Let create the tables on disk:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"counts_names = [\"counts_p%d\" % i for i in xrange(n_particles)]"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 12
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"#for i_p in xrange(n_particles): data_file.remove_node('/c', counts_names[i_p])"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 13
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"data_file.close()"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 14
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"data_file = tb.open_file('emission_pytables.hdf', mode=\"a\")\n",
"emission = data_file.root.emission"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 15
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"print 'File size: %.1f MB ' % (data_file.get_filesize()/1024./1024.)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"File size: 135.1 MB \n"
]
}
],
"prompt_number": 16
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# Create the tables for the counts\n",
"comp_filter = tb.Filters(complib='blosc', complevel=5)\n",
"table_uint8 = np.dtype([('counts', 'u1')])\n",
"\n",
"counts_p = []\n",
"for i_p in xrange(n_particles):\n",
" # 2) create counts table\n",
" counts_p.append(data_file.create_table('/c', counts_names[i_p], createparents=True,\n",
" description=table_uint8,\n",
" chunkshape = time_chunk_size,\n",
" expectedrows = n_iter*time_chunk_size,\n",
" filters = comp_filter)\n",
" )"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 17
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The tables references are stored in a list `counts_p`:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"counts_p[0]"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 18,
"text": [
"/c/counts_p0 (Table(0,), shuffle, blosc(5)) ''\n",
" description := {\n",
" \"counts\": UInt8Col(shape=(), dflt=0, pos=0)}\n",
" byteorder := 'little'\n",
" chunkshape := (262144,)"
]
}
],
"prompt_number": 18
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Computes the counts and store them on disk:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%timeit -n1 -r1\n",
"# Extract the counts from the emission\n",
"for i_chunk in xrange(n_iter):\n",
" emission_chunk = emission[:, i_chunk*time_chunk_size:i_chunk*time_chunk_size+time_chunk_size]\n",
" #print emission_chunk.shape\n",
" for p_i in xrange(n_particles):\n",
" counts_chunk_p_i = compute_counts(emission_chunk[p_i])\n",
" counts_p[p_i].append(counts_chunk_p_i)\n",
"\n",
"data_file.flush()"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"1 loops, best of 1: 1.54 s per loop\n"
]
}
],
"prompt_number": 19
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> ~20% Faster that in-memory counts computation!!"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"print 'File size: %.1f MB ' % (data_file.get_filesize()/1024./1024.)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"File size: 147.6 MB \n"
]
}
],
"prompt_number": 20
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"counts_p[0]"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 21,
"text": [
"/c/counts_p0 (Table(5242880,), shuffle, blosc(5)) ''\n",
" description := {\n",
" \"counts\": UInt8Col(shape=(), dflt=0, pos=0)}\n",
" byteorder := 'little'\n",
" chunkshape := (262144,)"
]
}
],
"prompt_number": 21
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For testing purposes, load all the counts and compare with the in-memory results:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"counts_tb = np.vstack([c.read() for c in counts_p]).astype('i1')\n",
"counts_tb.sum(1)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 22,
"text": [
"array([308778, 310327, 310591, 309985, 310035, 310663, 309909, 310130,\n",
" 310264, 310064, 309592, 311289, 309805, 309989, 310662])"
]
}
],
"prompt_number": 22
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"counts_ram.sum(1)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 33,
"text": [
"array([311238, 308140, 310120, 309996, 309671, 310261, 310743, 309631,\n",
" 310158, 309335, 309438, 310189, 309709, 309499, 309755])"
]
}
],
"prompt_number": 33
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"(counts_tb == counts_ram).all()"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 34,
"text": [
"True"
]
}
],
"prompt_number": 34
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"####Compute timestamps\n",
"\n",
"When there are more than 1 counts per bin I add a \"fraction of bin\" to the timestamp in order to avoid photon bunching. The timestamp resolution is increased 10 times so that integers can represent fractions of the initial time-bin.\n",
"\n",
"Lists are defined outside the **timeit** loop otherwise are not saved:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# Compute timestamps from the counts\n",
"fractions = [5, 2, 8, 4, 9, 1, 7, 3, 6, 9, 0, 5, 2, 8, 4, 9]\n",
"scale = 10\n",
"max_counts = 4\n",
"\n",
"timestamps_tb = []"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 23
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%timeit -n1 -r1\n",
"for counts_p_i in counts_p:\n",
" PH = [counts_p_i.get_where_list('counts >= 1')]\n",
" PH[0] *= scale\n",
" for frac, v in izip(fractions, range(2, max_counts + 1)):\n",
" PH.append(counts_p_i.get_where_list('counts >= %d' % v)*scale)\n",
" PH[-1] += frac\n",
" t = np.hstack(PH).astype(np.int64)\n",
" t.sort(kind='mergesort')\n",
" timestamps_tb.append(t)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"1 loops, best of 1: 4.29 s per loop\n"
]
}
],
"prompt_number": 24
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Consistency checks:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"print [(t1.size == t2.size) for t1, t2 in zip(timestamps_tb, timestamps_ram)]"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 58
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"print [(t1 == t2).all() for t1, t2 in zip(timestamps_tb, timestamps_ram)]"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"[True, True, True, True, True, True, True, True, True, True, True, True, True, True, True]\n"
]
}
],
"prompt_number": 38
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"###PyTables: compute `timestamps` (without storing `counts`)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This computes \"`timestamps`\" in a list, without the final merge:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"fractions = [5, 2, 8, 4, 9, 1, 7, 3, 6, 9, 0, 5, 2, 8, 4, 9]\n",
"scale = 10\n",
"max_counts = 4\n",
"\n",
"times_list = [[] for i in xrange(n_particles)]"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 102
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%timeit -r1 -n1\n",
"\n",
"# Load emission in chunks, and save only the final timestamps\n",
"for i_chunk in xrange(n_iter):\n",
" i_start = i_chunk*time_chunk_size\n",
" emission_chunk = emission[:, i_start:i_start + time_chunk_size]\n",
" counts_chunk = compute_counts(emission_chunk)\n",
" index = np.arange(0, counts_chunk.shape[1])\n",
" \n",
" # Loop for each particle to compute timestamps\n",
" for p_i, counts_chunk_p_i in enumerate(counts_chunk.copy()):\n",
" times_c_i = [(index[counts_chunk_p_i >= 1] + i_start)*scale]\n",
" for frac, v in izip(fractions, range(2, max_counts + 1)):\n",
" times_c_i.append((index[counts_chunk_p_i >= v] + i_start)*scale + frac)\n",
" \n",
" t = np.hstack(times_c_i)\n",
" t.sort(kind='mergesort')\n",
" times_list[p_i].append(t)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"1 loops, best of 1: 2.26 s per loop\n"
]
}
],
"prompt_number": 103
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> ~30% faster than computation from disk (when \"`counts` are stored)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Consistency checks:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"timestamps_tb2 = [np.hstack(t) for t in times_list]\n",
"print [(t1.size == t2.size) for t1, t2 in zip(timestamps_tb2, timestamps_ram)]\n",
"print [(t1 != t2).sum() for t1, t2 in zip(timestamps_tb2, timestamps_ram)]"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"[True, True, True, True, True, True, True, True, True, True, True, True, True, True, True]\n",
"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]\n"
]
}
],
"prompt_number": 104
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This computes \"`timestamps`\" in a list, then merge them (`hstack`) in a single array:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"fractions = [5, 2, 8, 4, 9, 1, 7, 3, 6, 9, 0, 5, 2, 8, 4, 9]\n",
"scale = 10\n",
"max_counts = 4\n",
"\n",
"timestamps_list_all_tb = []\n",
"par_list_all_tb = []"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 33
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%timeit -r1 -n1\n",
"\n",
"# Load emission in chunks, and save only the final timestamps\n",
"for i_chunk in xrange(n_iter):\n",
" i_start = i_chunk*time_chunk_size\n",
" emission_chunk = emission[:, i_start:i_start + time_chunk_size]\n",
" counts_chunk = compute_counts(emission_chunk)\n",
" index = np.arange(0, counts_chunk.shape[1])\n",
" \n",
" # Loop for each particle to compute timestamps\n",
" times_chunk = [[] for i in xrange(n_particles)]\n",
" par_index_chunk = [[] for i in xrange(n_particles)]\n",
" for p_i, counts_chunk_p_i in enumerate(counts_chunk.copy()):\n",
" times_c_i = [(index[counts_chunk_p_i >= 1] + i_start)*scale]\n",
" for frac, v in izip(fractions, range(2, max_counts + 1)):\n",
" times_c_i.append((index[counts_chunk_p_i >= v] + i_start)*scale + frac)\n",
" \n",
" # Stack the arrays of different \"counts\"\n",
" t = np.hstack(times_c_i)\n",
" times_chunk[p_i] = t\n",
" par_index_chunk[p_i] = p_i*np.ones(t.size, dtype='u1')\n",
" \n",
" # Merge the arrays of different particles\n",
" times_all = np.hstack(times_chunk)\n",
" par_index_all = np.hstack(par_index_chunk)\n",
" \n",
" # Sort timestamps inside the merged chunk\n",
" index_sort = times_all.argsort(kind='mergesort')\n",
" times_all = times_all[index_sort]\n",
" par_index_all = par_index_all[index_sort]\n",
" \n",
" # Save timestamps and particles\n",
" timestamps_list_all_tb.append(times_all)\n",
" par_list_all_tb.append(par_index_all)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"1 loops, best of 1: 5.06 s per loop\n"
]
}
],
"prompt_number": 34
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# Make union of the chunks\n",
"timestamps_all_tb = np.hstack(timestamps_list_all_tb)\n",
"par_all_tb = np.hstack(par_list_all_tb)"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 35
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Consistency checks:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"timestamps_all_tb.shape, timestamps_all_ram.shape"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 36,
"text": [
"((4645930,), (4645930,))"
]
}
],
"prompt_number": 36
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"print (timestamps_all_tb == timestamps_all_ram).all()\n",
"print (par_all_tb == par_index_all_ram).all()"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"True\n",
"True"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"\n"
]
}
],
"prompt_number": 37
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"###PyTables: compute \"`timestamps`\", storing them on disk on-the-go"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"#data_file.remove_node('/ts', 'timestamps_all')\n",
"#data_file.remove_node('/ts', 'par_all')"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 109
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"data_file.close()"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 41
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"data_file = tb.open_file('emission_pytables.hdf', mode=\"a\")\n",
"emission = data_file.root.emission"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 42
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"print 'File size: %.1f MB ' % (data_file.get_filesize()/1024./1024.)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"File size: 135.1 MB \n"
]
}
],
"prompt_number": 43
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# Create the table for the timestamps_all\n",
"comp_filter = tb.Filters(complib='blosc', complevel=5)\n",
"table_int64 = np.dtype([('times', 'i8')])\n",
"table_uint8 = np.dtype([('particle', 'i1')])\n",
"\n",
"timestamps_all_table = data_file.create_table('/ts', 'timestamps_all', createparents=True,\n",
" description=table_int64, filters = comp_filter)\n",
"par_all_table = data_file.create_table('/ts', 'par_all', createparents=True,\n",
" description=table_uint8, filters = comp_filter)"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 44
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"fractions = [5, 2, 8, 4, 9, 1, 7, 3, 6, 9, 0, 5, 2, 8, 4, 9]\n",
"scale = 10\n",
"max_counts = 4"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 45
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%timeit -r1 -n1\n",
"\n",
"# Load emission in chunks, and save only the final timestamps\n",
"for i_chunk in xrange(n_iter):\n",
" i_start = i_chunk*time_chunk_size\n",
" emission_chunk = emission[:, i_start:i_start + time_chunk_size]\n",
" counts_chunk = compute_counts(emission_chunk)\n",
" index = np.arange(0, counts_chunk.shape[1])\n",
" \n",
" # Loop for each particle to compute timestamps\n",
" times_chunk = [[] for i in xrange(n_particles)]\n",
" par_index_chunk = [[] for i in xrange(n_particles)]\n",
" for p_i, counts_chunk_p_i in enumerate(counts_chunk.copy()):\n",
" times_c_i = [(index[counts_chunk_p_i >= 1] + i_start)*scale]\n",
" for frac, v in izip(fractions, range(2, max_counts + 1)):\n",
" times_c_i.append((index[counts_chunk_p_i >= v] + i_start)*scale + frac)\n",
" \n",
" # Stack the arrays of different \"counts\"\n",
" t = np.hstack(times_c_i)\n",
" times_chunk[p_i] = t\n",
" par_index_chunk[p_i] = p_i*np.ones(t.size, dtype='u1')\n",
" \n",
" # Merge the arrays of different particles\n",
" times_all = np.hstack(times_chunk)\n",
" par_index_all = np.hstack(par_index_chunk)\n",
" \n",
" # Sort timestamps inside the merged chunk\n",
" index_sort = times_all.argsort(kind='mergesort')\n",
" times_all = times_all[index_sort]\n",
" par_index_all = par_index_all[index_sort]\n",
" \n",
" # Save timestamps and particles\n",
" timestamps_all_table.append(times_all)\n",
" par_all_table.append(par_index_all)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"1 loops, best of 1: 5.3 s per loop\n"
]
}
],
"prompt_number": 46
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# Make union of the chunks\n",
"timestamps_all_tb3 = timestamps_all_table.read().astype('int64')\n",
"par_all_tb3 = par_all_table.read().astype('uint8')"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 54
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"print 'File size: %.1f MB ' % (data_file.get_filesize()/1024./1024.)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"File size: 145.0 MB \n"
]
}
],
"prompt_number": 48
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Consistency checks:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"timestamps_all_tb3.shape, timestamps_all_ram.shape"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 55,
"text": [
"((4645930,), (4645930,))"
]
}
],
"prompt_number": 55
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"print (timestamps_all_tb3 == timestamps_all_ram).all()\n",
"print (par_all_tb3 == par_index_all_ram).all()"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"True\n",
"True\n"
]
}
],
"prompt_number": 58
},
{
"cell_type": "code",
"collapsed": false,
"input": [],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##Conclusions"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Storing \"`counts`\"\n",
"Storing counts will take 10% more space and 40% more time to compute timestamps. Having `counts` stored is not an advantage per-se because only the timestamps are needed in the end. \n",
"\n",
"The advantage is that recostructing the index (timestamps) is simpler because we query the full time axis in a single command (`.get_where_list('counts >= 1')`). Conversely, with chunked processing, we need to perfome some index arithmetics that is tricky, and maybe a burden to maintain.\n",
"\n",
"However the the code complexity may be small compared to the sorting and merging that is needed in both cases.\n",
"\n",
"### Storing \"`timestamps`\"\n",
"\n",
"Timestamps can be accumulated in RAM. However a final `hstack()` is needed to \"merge\" the different chunks stored in a list. This doubles the memory requirements so the RAM may be insufficient.\n",
"\n",
"We can store as-we-go timestamps to a table using `.append()`. At the end we can load the table in memory with `.read()`. This is only 10% slower than all-in-memory computation but avoids the \"double the RAM\" requirement. Moreover we can avoid the final full load resulting in minimal RAM usage.\n",
"\n",
"### H5Py\n",
"\n",
"**H5py** is a much simpler library than pytables. For this usecase of sequential processing seems a better fit than pytables. The only missing feature is the lack of 'blosc' compression. Must be stested if this results in a big performance penalty."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "code",
"collapsed": false,
"input": [],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "code",
"collapsed": false,
"input": [],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "code",
"collapsed": false,
"input": [],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"###Simulate \"`counts`\" with pandas"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Solution suggested by **Jeff** [here](http://stackoverflow.com/questions/20940805/python-particles-simulator-out-of-core-processing):"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> **WARNING** this takes several times more times than the pytables approach, not usefull in real situations"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# generate simulated data\n",
"for i in range(10):\n",
" # 2) create counts\n",
" cs = pd.HDFStore('counts.hdf', mode='w', complib='blosc')\n",
"\n",
" # this is an iterator, can be any size\n",
" for df in pd.read_hdf('emission_pandas.hdf', 'df',chunksize=200):\n",
"\n",
" counts = pd.DataFrame(np.random.poisson(lam=df).astype(np.uint8))\n",
"\n",
" # set the index as the same\n",
" counts.index = df.index\n",
"\n",
" # store the sum across all particles (as most are zero this will be a\n",
" # nice sub-selector\n",
" # better maybe to have multiple of these sums that divide the particle space\n",
" # you don't have to do this but prob more efficient\n",
" # you can do this in another file if you want/need\n",
" counts['particles_0_4'] = counts.iloc[:,0:4].sum(1).astype('u1')\n",
" counts['particles_5_9'] = counts.iloc[:,5:9].sum(1).astype('u1')\n",
"\n",
" # make the non_zero column indexable\n",
" cs.append('df', counts,data_columns=['particles_0_4','particles_5_9'])\n",
"\n",
" cs.close()\n",
"\n",
" # 3) find interesting counts\n",
" print pd.read_hdf('counts.hdf','df',where='particles_0_4>0')\n",
" print pd.read_hdf('counts.hdf','df',where='particles_5_9>0')"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 63
},
{
"cell_type": "code",
"collapsed": false,
"input": [],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 62
},
{
"cell_type": "code",
"collapsed": false,
"input": [],
"language": "python",
"metadata": {},
"outputs": []
}
],
"metadata": {}
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment