Last active
November 2, 2019 19:38
-
-
Save pbabics/ef023652712c3d2dec2fc2747d253b29 to your computer and use it in GitHub Desktop.
Simple AggregatedTimeSeries storage which allows fast retrieval of aggregated value. Aggregating function is defined by two functions, binary function `op` and inverse function `inv`.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <vector> | |
#include <exception> | |
/** | |
* Exception thrown whenever AggregatedTimeSeries instance | |
* is trying to access First or Last element of storage | |
* while storage is empty | |
*/ | |
struct EmptyTimeSeries: public std::exception { | |
const char* what() const throw () { | |
return "TimeSeries storage is empty"; | |
} | |
}; | |
/** | |
* Exception thrown whenever AggregatedTimeSeries instance | |
* is requested to aggregated value for timeframe which | |
* has higher initial time than last element in storage | |
*/ | |
struct EmptyTimeRange: public std::exception { | |
const char* what() const throw () { | |
return "Requested Time range is empty"; | |
} | |
}; | |
typedef long long int64; | |
typedef unsigned long long uint64; | |
/** | |
* Class used for representing aggregated time series entry | |
* | |
* @tparam V represents type of value and aggregated value | |
* @tparam T represents type of time entry | |
*/ | |
template <class V, class T> | |
struct AggregatedTimeSeriesItem { | |
AggregatedTimeSeriesItem(const T& _time, const V& _value, const V& _aggregate) : | |
time(_time), value(_value), aggregate(_aggregate) | |
{ } | |
T time; | |
V value; | |
V aggregate; | |
}; | |
/** | |
* Class used for representing aggregated time series | |
* | |
* Entries may be inserted out for order, during insert they are properly inserted | |
* and aggregated value is calcualted. | |
* Aggregating function is defined in class constructor. Actually two functions are required | |
* One which aggregates two values (op), and other which is able to create inverse value (inv). | |
* | |
* Aggregation is done during insertion to allow fast aggregate retrieval. | |
* If element is inserted into inserted on first position, aggregated value is equal to element value | |
* and rest of storage is updated accordingly, adding new value to the rest of aggregates | |
* | |
* If element is inserted into middle of the storage, aggregated value up to point of insertion | |
* is used as `previous_aggregated_value`, newly inserted element than has aggregated value equal to | |
* `op(previous_aggregated_value, new_value)`. Rest of storage is again updated accordingly. | |
* | |
* When aggregate is requested value is calculated as | |
* `op(aggregate from last storage entry, inv(aggregate from index right before requested time frame))` | |
* | |
* @tparam V represents type of value and aggregated value | |
* @tparam T represents type of time entry | |
* @tparam TimeSeriesStorage represents type of storage used for time serie | |
* default type is std::vector | |
*/ | |
template < | |
typename V, | |
typename T, | |
typename TimeSeriesStorage = std::vector<AggregatedTimeSeriesItem<V, T> > | |
> | |
class AggregatedTimeSeries { | |
TimeSeriesStorage _storage; | |
const V (*_op) (const V&, const V&); | |
const V (*_inv) (const V&); | |
public: | |
/** | |
* Aggragated Time Serie constructor. | |
* | |
* Takes two arguments, first one is function defining | |
* aggregation of two values, second is function | |
* which creates inverse value. These two functions are | |
* used to effectively calculate aggregate of values in | |
* different time ranges. See class description for detail | |
* | |
* @param op Binary function defining aggregation of two values, e.g `a + b` | |
* @param inv Unary function creating inverse value, e.g `- a` | |
*/ | |
AggregatedTimeSeries( | |
const V (*op) (const V&, const V&), | |
const V (*inv) (const V&) | |
); | |
virtual ~AggregatedTimeSeries() { } | |
/** | |
* Returns aggregated value of all values stored in time serie storage | |
* | |
* @throws EmptyTimeSeries Exception is raised when method is called on empty time serie | |
*/ | |
V getAggregate() const; | |
/** | |
* @param since Minimal time which should be incorporated in aggregated value | |
* @return aggregated value of values stored in time serie storage | |
* which have time index greater or equal to ``since`` parameter. | |
* | |
* @throws EmptyTimeSeries Exception is raised when method is called on empty time serie | |
* @throws EmptyTimeRange Exception is raised when method is called with ``since`` | |
* higher than time of newest value | |
*/ | |
V getAggregate(const T& since) const; | |
/** | |
* @return count of all values stored in time serie storage | |
*/ | |
uint64 getCount() const; | |
/** | |
* @return count of values stored in time serie storage | |
* which have time index greater or equal to ``since`` parameter. | |
* @param since Minimal time which should be incorporated in aggregated value | |
*/ | |
uint64 getCount(const T& since) const; | |
/** | |
* @return value of first (oldest) element stored in time serie storage | |
* @throws EmptyTimeSeries Exception is raised when method is called on empty time serie | |
*/ | |
const V& getLastValue() const; | |
/** | |
* @return value of last (newest) element stored in time serie storage | |
* @throws EmptyTimeSeries Exception is raised when method is called on empty time serie | |
*/ | |
const V& getFirstValue() const; | |
/** | |
* Inserts new element into time serie, updating aggregated values | |
* of stored entries if necessary | |
* | |
* @param value to be inserted into time serie stoarge | |
* @param time at which value should be inserted | |
*/ | |
void insert(const V& value, const T& time); | |
/** | |
* Clears entire time serie storage | |
*/ | |
void clear(); | |
/** | |
* Removes entries older than ``until`` from time serie storage | |
* | |
* @param until all elements which time value lower than this will be removed | |
*/ | |
void flush(const T& until); | |
private: | |
int64 _find_insertion_point(const T& time) const; | |
}; | |
template <class V, class T, class TimeSeriesStorage> | |
AggregatedTimeSeries<V, T, TimeSeriesStorage>::AggregatedTimeSeries( | |
const V (*op) (const V&, const V&), | |
const V (*inv) (const V&) | |
) : | |
_op(op), _inv(inv) | |
{ | |
} | |
template <class V, class T, class TimeSeriesStorage> | |
V AggregatedTimeSeries<V, T, TimeSeriesStorage>::getAggregate() const | |
{ | |
if (!_storage.size()) | |
throw EmptyTimeSeries(); | |
return _op( | |
_op(_storage.back().aggregate, _inv(_storage.front().aggregate)), | |
_storage.front().value | |
); | |
} | |
template <class V, class T, class TimeSeriesStorage> | |
V AggregatedTimeSeries<V, T, TimeSeriesStorage>::getAggregate(const T& since) const | |
{ | |
if (!_storage.size()) | |
throw EmptyTimeSeries(); | |
int64 index = _find_insertion_point(since); | |
if (index < 0) { | |
index = -index - 1; | |
if (index == _storage.size()) | |
throw EmptyTimeRange(); | |
else if (index == 0) | |
return getAggregate(); | |
else | |
return _op(_storage.back().aggregate, _inv(_storage[index].aggregate)); | |
} | |
else if (index == 0) | |
return getAggregate(); | |
else { | |
return _op(_storage.back().aggregate, _inv(_storage[index - 1].aggregate)); | |
} | |
} | |
template <class V, class T, class TimeSeriesStorage> | |
uint64 AggregatedTimeSeries<V, T, TimeSeriesStorage>::getCount() const | |
{ | |
return _storage.size(); | |
} | |
template <class V, class T, class TimeSeriesStorage> | |
uint64 AggregatedTimeSeries<V, T, TimeSeriesStorage>::getCount(const T& since) const | |
{ | |
int64 index = _find_insertion_point(since); | |
if (index < 0) | |
index = -index - 1; | |
return _storage.size() - index; | |
} | |
template <class V, class T, class TimeSeriesStorage> | |
const V& AggregatedTimeSeries<V, T, TimeSeriesStorage>::getLastValue() const | |
{ | |
if (!_storage.size()) | |
throw EmptyTimeSeries(); | |
return _storage.back().value; | |
} | |
template <class V, class T, class TimeSeriesStorage> | |
const V& AggregatedTimeSeries<V, T, TimeSeriesStorage>::getFirstValue() const | |
{ | |
if (!_storage.size()) | |
throw EmptyTimeSeries(); | |
return _storage.front().value; | |
} | |
template <class V, class T, class TimeSeriesStorage> | |
void AggregatedTimeSeries<V, T, TimeSeriesStorage>::insert(const V& value, const T& time) | |
{ | |
int64 index = _find_insertion_point(time); | |
if (index < 0) | |
index = -index - 1; | |
V aggregate = value; | |
if (index > 0) | |
aggregate = _op(_storage[index - 1].aggregate, aggregate); | |
typename TimeSeriesStorage::iterator it = _storage.begin(); | |
std::advance(it, index); | |
// Insert new element, with propper aggregated value | |
it = _storage.insert( | |
it, | |
AggregatedTimeSeriesItem<V, T>( | |
time, | |
value, | |
aggregate | |
) | |
); | |
// Update all elements in front of newly inserted one | |
for ( | |
++it; | |
it != _storage.end(); | |
++it | |
) | |
it->aggregate = aggregate = _op(aggregate, it->value); | |
} | |
template <class V, class T, class TimeSeriesStorage> | |
int64 AggregatedTimeSeries<V, T, TimeSeriesStorage>::_find_insertion_point(const T& time) const | |
{ | |
uint64 low = 0, | |
high = _storage.size() - 1; | |
// Optimization for value before first value and after last value in storage | |
if (not _storage.size()) | |
return -1; | |
else if (_storage[high].time < time) | |
return -_storage.size() - 1; | |
else if (_storage[low].time > time) | |
return -1; | |
while (low < high) | |
{ | |
int64 selection = (high + low) / 2; | |
const T& selected_time = _storage[selection].time; | |
if (selected_time > time) | |
high = selection - 1; | |
else if (selected_time < time) | |
low = selection + 1; | |
else | |
return selection; | |
} | |
return -low - 1; | |
} | |
template <class V, class T, class TimeSeriesStorage> | |
void AggregatedTimeSeries<V, T, TimeSeriesStorage>::clear() | |
{ | |
_storage.clear(); | |
} | |
template <class V, class T, class TimeSeriesStorage> | |
void AggregatedTimeSeries<V, T, TimeSeriesStorage>::flush(const T& until) | |
{ | |
int64 index = _find_insertion_point(until); | |
if (index < 0) | |
index = -index - 1; | |
typename TimeSeriesStorage::iterator it = _storage.begin(); | |
std::advance(it, index); | |
_storage.erase(_storage.begin(), it); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment