Created
October 10, 2014 11:41
-
-
Save mtambos/aa435461084b5c0025d1 to your computer and use it in GitHub Desktop.
RingBuffer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
class RingBuffer(np.ndarray): | |
'A multidimensional ring buffer.' | |
def __new__(cls, input_array): | |
obj = np.asarray(input_array).view(cls) | |
return obj | |
def __array_finalize__(self, obj): | |
if obj is None: return | |
def __array_wrap__(self, out_arr, context=None): | |
return np.ndarray.__array_wrap__(self, out_arr, context) | |
def extend(self, xs): | |
'Adds array xs to the ring buffer. If xs is longer than the ring ' | |
'buffer, the last len(ring buffer) of xs are added the ring buffer.' | |
xs = np.asarray(xs) | |
if self.shape[1:] != xs.shape[1:]: | |
raise ValueError("Element's shape mismatch. RingBuffer.shape={}. " | |
"xs.shape={}".format(self.shape, xs.shape)) | |
len_self = len(self) | |
len_xs = len(xs) | |
if len_self <= len_xs: | |
xs = xs[-len_self:] | |
len_xs = len(xs) | |
else: | |
self[:-len_xs] = self[len_xs:] | |
self[-len_xs:] = xs | |
def append(self, x): | |
'Adds element x to the ring buffer.' | |
x = np.asarray(x) | |
if self.shape[1:] != x.shape: | |
raise ValueError("Element's shape mismatch. RingBuffer.shape={}. " | |
"xs.shape={}".format(self.shape, x.shape)) | |
len_self = len(self) | |
self[:-1] = self[1:] | |
self[-1] = x |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
import unittest | |
import numpy as np | |
import ring_buffer | |
class RingBufferTests(unittest.TestCase): | |
def test_extend_less_than_len_1d(self): | |
#unidimensional case | |
ringbuff = ring_buffer.RingBuffer(np.zeros(10)) | |
expected = np.random.random(8) | |
ringbuff.extend(expected) | |
self.assertEqual(expected.tolist(), | |
ringbuff[-len(expected):].tolist()) | |
def test_extend_less_than_len_nd(self): | |
#n-dimensional case | |
ringbuff = ring_buffer.RingBuffer(np.zeros((10, 4))) | |
expected = np.random.random((8, 4)) | |
ringbuff.extend(expected) | |
self.assertEqual(expected.tolist(), | |
ringbuff[-len(expected):].tolist()) | |
def test_extend_equal_len_1d(self): | |
#unidimensional case | |
ringbuff = ring_buffer.RingBuffer(np.zeros(10)) | |
expected = np.random.random(10) | |
ringbuff.extend(expected) | |
self.assertEqual(expected.tolist(), ringbuff.tolist()) | |
def test_extend_equal_len_nd(self): | |
#n-dimensional case | |
ringbuff = ring_buffer.RingBuffer(np.zeros((10, 4))) | |
expected = np.random.random((10, 4)) | |
ringbuff.extend(expected) | |
self.assertEqual(expected.tolist(), ringbuff.tolist()) | |
def test_extend_more_than_len_1d(self): | |
#unidimensional case | |
ringbuff = ring_buffer.RingBuffer(np.zeros(10)) | |
expected = np.random.random(12) | |
ringbuff.extend(expected) | |
self.assertEqual(expected[-len(ringbuff):].tolist(), | |
ringbuff.tolist()) | |
def test_extend_more_than_len_nd(self): | |
#n-dimensional case | |
ringbuff = ring_buffer.RingBuffer(np.zeros((10, 4))) | |
expected = np.random.random((12, 4)) | |
ringbuff.extend(expected) | |
self.assertEqual(expected[-len(ringbuff):].tolist(), | |
ringbuff.tolist()) | |
def test_append_less_than_len_1d(self): | |
#unidimensional case | |
ringbuff = ring_buffer.RingBuffer(np.zeros(10)) | |
expected = np.random.random(8) | |
for i in range(len(expected)): | |
ringbuff.append(expected[i]) | |
self.assertEqual(expected.tolist(), | |
ringbuff[-len(expected):].tolist()) | |
def test_append_less_than_len_nd(self): | |
#n-dimensional case | |
ringbuff = ring_buffer.RingBuffer(np.zeros((10, 4))) | |
expected = np.random.random((8, 4)) | |
for i in range(len(expected)): | |
ringbuff.append(expected[i]) | |
self.assertEqual(expected.tolist(), | |
ringbuff[-len(expected):].tolist()) | |
def test_extend_equal_len_1d(self): | |
#unidimensional case | |
ringbuff = ring_buffer.RingBuffer(np.zeros(10)) | |
expected = np.random.random(10) | |
for i in range(len(expected)): | |
ringbuff.append(expected[i]) | |
self.assertEqual(expected.tolist(), ringbuff.tolist()) | |
def test_extend_equal_len_nd(self): | |
#n-dimensional case | |
ringbuff = ring_buffer.RingBuffer(np.zeros((10, 4))) | |
expected = np.random.random((10, 4)) | |
for i in range(len(expected)): | |
ringbuff.append(expected[i]) | |
self.assertEqual(expected.tolist(), ringbuff.tolist()) | |
def test_extend_more_than_len_1d(self): | |
#unidimensional case | |
ringbuff = ring_buffer.RingBuffer(np.zeros(10)) | |
expected = np.random.random(12) | |
for i in range(len(expected)): | |
ringbuff.append(expected[i]) | |
self.assertEqual(expected[-len(ringbuff):].tolist(), | |
ringbuff.tolist()) | |
def test_extend_more_than_len_nd(self): | |
#n-dimensional case | |
ringbuff = ring_buffer.RingBuffer(np.zeros((10, 4))) | |
expected = np.random.random((12, 4)) | |
for i in range(len(expected)): | |
ringbuff.append(expected[i]) | |
self.assertEqual(expected[-len(ringbuff):].tolist(), | |
ringbuff.tolist()) | |
if __name__ == '__main__': | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment