Last active
March 2, 2024 21:59
-
-
Save vitaly-t/e4475271ad08ee6879da756b31c068a0 to your computer and use it in GitHub Desktop.
Value spike detection
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import {Observable, switchMap} from 'rxjs'; | |
/** | |
* Spike detection result. | |
*/ | |
export interface ISpike { | |
/** | |
* Detected spike change, in percent. | |
*/ | |
change: number; | |
/** | |
* Current value, compared to the `origin` value. | |
*/ | |
current: number; | |
/** | |
* The value, deviation from which triggerred the spike. | |
*/ | |
origin: number; | |
/** | |
* Interval (in ms) between `origin` and `current` values. | |
* It is the time it took to produce the spike inside the interval. | |
*/ | |
interval: number; | |
/** | |
* Current size of the spike buffer. This provides an indication as to the frequency | |
* of spikes, or how many values it took to trigger the last spike. | |
* It also helps to detect potential memory overuse, if interval is too big, | |
* or the sequence is too fast, the internal buffer size may grow too big. | |
*/ | |
size: number; | |
} | |
/** | |
* SpikeDetector constructor options. | |
*/ | |
export type SpikeParams = { | |
/** | |
* Value change, in percent, to verify for spikes. | |
*/ | |
change: number; | |
/** | |
* Time interval (in ms) during which to check for spikes. | |
*/ | |
interval: number; | |
/** | |
* Spikes verification strategy: | |
* - false (default) - check current value against the first value within interval. | |
* - true - check current value against all previous values within interval. | |
* | |
* Its use should depend on the business/strategy logic of your data. | |
*/ | |
checkAll?: boolean; | |
}; | |
/** | |
* Detects percent-based spikes in value changes. | |
* See: https://stackoverflow.com/questions/77989971/value-deviation-pressure-with-rxjs | |
*/ | |
export class SpikeDetector { | |
private buffer: Array<{ ts: number, value: number }> = []; | |
constructor(private sp: SpikeParams) { | |
} | |
/** | |
* Takes a new value, and if a spike detected - returns it; | |
* otherwise it returns `undefined`. | |
*/ | |
next(value: number): ISpike | void { | |
const ts = Date.now(); | |
const b = this.buffer; | |
while (b.length && ts - b[0].ts > this.sp.interval) { | |
b.shift(); // remove expired values | |
} | |
b.push({ts, value}); | |
if (b.length >= 2) { | |
if (this.sp.checkAll) { | |
return this.checkAll(value, ts); | |
} | |
return this.checkFirst(value, ts); | |
} | |
} | |
/** | |
* Removes all elements from the buffer, | |
* in case the client logic requires it. | |
*/ | |
reset(): void { | |
this.buffer.length = 0; | |
} | |
/** | |
* Verifies current value for spikes against the first element. | |
* | |
* It expects at least 2 elements present in the buffer. | |
*/ | |
private checkFirst(current: number, ts: number): ISpike | void { | |
const b = this.buffer; | |
const origin = b[0].value; | |
const change = 100 * current / origin - 100; | |
if (Math.abs(change) >= this.sp.change) { | |
const size = b.length; | |
const interval = ts - b[0].ts; | |
this.buffer = b.slice(-1); // leaving just the last element | |
return {change, origin, current, interval, size}; | |
} | |
} | |
/** | |
* Verifies current value for spikes against all previous elements. | |
* | |
* It expects at least 2 elements present in the buffer. | |
*/ | |
private checkAll(current: number, ts: number): ISpike | void { | |
const b = this.buffer; | |
let i = 0; | |
do { | |
const origin = b[i].value; | |
const change = 100 * current / origin - 100; | |
if (Math.abs(change) >= this.sp.change) { | |
const size = b.length; | |
const interval = ts - b[i].ts; | |
this.buffer = b.slice(-1); // leaving just the last element | |
return {change, origin, current, interval, size}; | |
} | |
} while (++i < b.length - 1); | |
} | |
} | |
/** | |
* RXJS helper for use of SpikeDetector class. | |
*/ | |
export function changeSpike(input: Observable<number>, sp: SpikeParams): Observable<ISpike> { | |
const sd = new SpikeDetector(sp); | |
return input.pipe(switchMap(value => | |
new Observable<ISpike>(obs => { | |
const spike = sd.next(value); | |
if (spike) { | |
obs.next(spike); | |
} | |
}) | |
)); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment