Skip to content

Instantly share code, notes, and snippets.

@danstowell
Created April 26, 2018 09:38
Show Gist options
  • Save danstowell/324a52e94a50b5b0c24a809c1d72abfb to your computer and use it in GitHub Desktop.
Save danstowell/324a52e94a50b5b0c24a809c1d72abfb to your computer and use it in GitHub Desktop.
Algorithm to sample random rows from a matrix that is too large to store in memory (Reservoir Random Sampling, Algorithm "R") https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_R
# by Dan Stowell 2014.
# This particular file implementing ReservoirSampleMatrix is published under the MIT Public Licence
# http://opensource.org/licenses/MIT
import numpy as np
import random
class ReservoirSampleMatrix:
"""This class helps you to take a uniform random sample from streaming data, using the 'Reservoir Sampling' technique.
How to use:
(1) initialise it with args (nrows, ncols), where nrows corresponds to the (max) size of the data sample you want, and ncols corresponds to your data dimensionality.
(2) push your data through, as a series of matrices (one row per datum), by calling append() whenever you have a chunk of data ready.
(3) call getsample() to get a matrix of subsampled data.
Note that all the memory needed is allocated when the object is first instantiated - meaning that if you initialise for a million rows,
but only supply ten rows, the object still takes up memory as if it contained a million rows. Generally, of course, you will supply
much more data than nrows.
Here is a code sample, using some rather bizarre data made from identity matrices:
from ReservoirSampleMatrix import ReservoirSampleMatrix
import numpy as np
rsm = ReservoirSampleMatrix(10, 5)
for scale in range(10):
e = np.eye(5) * scale
rsm.append(e)
rsm.getsample()
"""
def __init__(self, nrowsmax, ncols):
self._data = np.zeros((nrowsmax, ncols))
self.nrowsmax = nrowsmax
self.nrowsused = 0
def len(self):
return self.nrowsused
def getsample(self):
"Returns the accumulated random sample. Please note: repeated calls to this method will not generate independent random samples."
if self.nrowsused < self.nrowsmax:
return self._data[:self.nrowsused, :]
else:
return self._data
def append(self, matrix):
"pushes a matrix representing another batch of the data stream through. some subset of the rows will be stored as the random sample is accumulated."
for row in matrix:
if self.nrowsused < self.nrowsmax:
self._data[self.nrowsused] = row
self.nrowsused += 1
else:
self.nrowsused += 1 # pre-increment simplies the main calculation
randpos = random.randint(0, self.nrowsused) # upper limit inclusive not exclusive
if randpos < self.nrowsmax:
self._data[randpos] = row
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment