Created
March 6, 2025 20:01
-
-
Save beatzxbt/4b1a58954903c2580649eb5a8b7fd290 to your computer and use it in GitHub Desktop.
1d f64 ringbuffer.pyx
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
cimport numpy as cnp | |
from libc.stdint cimport uint64_t, int64_t | |
cdef class RingBufferOneDim: | |
""" | |
A 1-dimensional fixed-size circular buffer for floats/doubles. | |
""" | |
def __init__(self, uint64_t capacity): | |
""" | |
Parameters: | |
capacity (int): The maximum number of elements the buffer can hold. | |
""" | |
self._capacity = capacity | |
self._left_index = 0 | |
self._right_index = 0 | |
self._size = 0 | |
self._buffer = np.empty(capacity, dtype=np.double) | |
cpdef cnp.ndarray raw(self): | |
""" | |
Return a copy of the internal buffer array. | |
Returns: | |
np.ndarray: A copy of the internal 1D NumPy array representing the buffer's contents. | |
Note: | |
The returned array includes all allocated space, not just the filled elements. | |
""" | |
return np.asarray(self._buffer).copy() | |
cpdef cnp.ndarray unsafe_raw(self): | |
""" | |
Return a view of the internal buffer array without copying. | |
Returns: | |
np.ndarray: A NumPy array view of the buffer's internal data. | |
Warning: | |
Modifying the returned array may affect the buffer's internal state. | |
Use with caution, as no copy is made. | |
""" | |
return np.asarray(self._buffer) | |
cpdef cnp.ndarray unwrapped(self): | |
""" | |
Return a copy of the buffer's contents in the correct (unwrapped) order. | |
Returns: | |
np.ndarray: A 1D NumPy array containing the buffer's data in order from oldest to newest. | |
""" | |
if self.is_empty(): | |
return np.empty(0, dtype=np.double) | |
if self.is_full(): | |
return np.concatenate(( | |
self._buffer[self._left_index:], | |
self._buffer[:self._right_index] | |
)) | |
elif self._left_index < self._right_index: | |
return np.asarray(self._buffer[self._left_index:self._right_index]).copy() | |
else: | |
return np.concatenate(( | |
self._buffer[self._left_index:], | |
self._buffer[:self._right_index] | |
)) | |
cpdef void unsafe_write(self, double value): | |
""" | |
Directly write a value to the buffer at the current right index without updating indices. | |
Parameters: | |
value (float): The float value to be added to the buffer. | |
Warning: | |
This method does not check if the buffer is full and does not update buffer indices. | |
It is intended for use in conjunction with `unsafe_push`. Use with caution to avoid data corruption. | |
""" | |
self._buffer[self._right_index] = value | |
cpdef void unsafe_push(self): | |
""" | |
Advance the buffer indices after writing a value, without checking for buffer fullness. | |
Warning: | |
This method assumes that a value has already been written to the buffer at the current right index. | |
It updates the buffer indices accordingly. It does not check if the buffer is full. | |
If the buffer is full, it will overwrite the oldest data. Use with caution to avoid data corruption. | |
Note: | |
This method is intended for use in conjunction with `unsafe_write` for performance | |
optimization when you are certain that the buffer management is correct. | |
""" | |
if self.is_full(): | |
self._left_index = (self._left_index + 1) % self._capacity | |
else: | |
self._size += 1 | |
self._right_index = (self._right_index + 1) % self._capacity | |
cpdef void append(self, double value): | |
""" | |
Add a new element to the end of the buffer. | |
Parameters: | |
value (float): The float value to be added to the buffer. | |
""" | |
self.unsafe_write(value) | |
self.unsafe_push() | |
cpdef double popright(self): | |
""" | |
Remove and return the last element from the buffer. | |
Returns: | |
float: The last value in the buffer. | |
Raises: | |
IndexError: If the buffer is empty. | |
""" | |
if self._size == 0: | |
raise IndexError("Cannot pop from an empty RingBuffer.") | |
self._size -= 1 | |
self._right_index = (self._right_index - 1 + self._capacity) % self._capacity | |
return self._buffer[self._right_index] | |
cpdef double popleft(self): | |
""" | |
Remove and return the first element from the buffer. | |
Returns: | |
float: The first value in the buffer. | |
Raises: | |
IndexError: If the buffer is empty. | |
""" | |
if self._size == 0: | |
raise IndexError("Cannot pop from an empty RingBuffer.") | |
cdef double value = self._buffer[self._left_index] | |
self._left_index = (self._left_index + 1) % self._capacity | |
self._size -= 1 | |
return value | |
cpdef cnp.ndarray reset(self): | |
""" | |
Clear the buffer and reset it to its initial state. | |
Returns: | |
np.ndarray: A copy of the buffer's contents before resetting. | |
Note: | |
This method returns the data that was in the buffer before the reset. | |
""" | |
result = self.unwrapped() | |
self.fast_reset() | |
return result | |
cpdef void fast_reset(self): | |
""" | |
Quickly reset the buffer to its initial state without returning data. | |
Note: | |
This method clears the buffer's contents and resets indices. | |
It does not return the previous data. | |
""" | |
self._buffer[:] = 0.0 | |
self._left_index = 0 | |
self._right_index = 0 | |
self._size = 0 | |
cpdef bint is_full(self): | |
""" | |
Check if the buffer is full. | |
Returns: | |
bool: True if the buffer is full, False otherwise. | |
""" | |
return self._size == self._capacity | |
cpdef bint is_empty(self): | |
""" | |
Check if the buffer is empty. | |
Returns: | |
bool: True if the buffer is empty, False otherwise. | |
""" | |
return self._size == 0 | |
def __contains__(self, double value): | |
""" | |
Check if a value is present in the buffer. | |
Parameters: | |
value (float): The value to search for. | |
Returns: | |
bool: True if the value is in the buffer, False otherwise. | |
""" | |
if self.is_empty(): | |
return False | |
cdef uint64_t i, idx | |
for i in range(self._size): | |
idx = (self._left_index + i) % self._capacity | |
if self._buffer[idx] == value: | |
return True | |
return False | |
def __iter__(self): | |
""" | |
Iterate over the elements in the buffer in order from oldest to newest. | |
Yields: | |
float: Each value in the buffer. | |
""" | |
cdef uint64_t idx = self._left_index | |
for _ in range(self._size): | |
yield self._buffer[idx] | |
idx = (idx + 1) % self._capacity | |
def __len__(self): | |
""" | |
Get the number of elements currently in the buffer. | |
Returns: | |
int: The current size of the buffer. | |
""" | |
return self._size | |
def __getitem__(self, int idx): | |
""" | |
Get the element at the given index. | |
Parameters: | |
idx (int): The index of the element to retrieve. | |
Returns: | |
float: The value at the specified index. | |
Raises: | |
IndexError: If the index is out of range. | |
""" | |
cdef int64_t _size = self._size | |
if idx < 0: | |
idx += _size | |
if idx < 0 or idx >= _size: | |
raise IndexError("Index out of range.") | |
cdef int64_t fixed_idx = (self._left_index + idx) % self._capacity | |
return self._buffer[fixed_idx] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment