Skip to content

Instantly share code, notes, and snippets.

@welch
Last active August 29, 2015 14:19
Show Gist options
  • Save welch/228c401224dcdc90a49f to your computer and use it in GitHub Desktop.
Save welch/228c401224dcdc90a49f to your computer and use it in GitHub Desktop.
juttle forecast modules: trends and rate of change

Robust time derivatives

In this example we use trend estimation as a robust way to estimate the rate of change of a metric at a point in time. The constant every controls the interval of data we use for each estimate, and the frequency at which we update the estimate. The trend module (which is more typically used over long intervals of time) is applied to this very short interval. The slope of the fitted trend is returned by trend.rate, and the trended change over an interval as trend.change (which is in the same units as the input field, often more convenient for alerting and display)

Try different durations for every to see its effect. The simulated cpu sends a new value every 10 seconds, so every should be at least :20s: so it has enough samples to fit a line to them. Longer windows will give smoother derivative curves.

In this example, a every between :45s: and :2m: does a good job of highlighting the big breaks in the cpu curve while ignoring the noise.

import "sources.juttle" as sources;
import "trend.juttle" as trend;
const start = :2014-01-01: ;
const dt = :60s: ;
sources.ripple_cpu -from start -to start + :1h:
| trend.change -in 'cpu' -every dt -from start -out 'rate'
| split
| @timechart -title "60s change" -display.dataDensity 0
// math utilities
//
// Exported subs:
//
// Exported functions:
// inv2x2(a,b,c,d): invert 2x2 matrix
// isFinite(x): is x a good number?
//
// Exported reducers:
//
/////////////////////////////////////////////////////////////////////////
//
// inv2x2:
// invert a 2x2 matrix | a, b ; c, d | (row major order)
//
export function inv2x2(a, b, c, d) {
var det = (a * d - b * c) ;
return [d / det, -b / det, -c / det, a / det] ;
}
//
// isFinite:
// verify x is a good number.
//
export function isFinite(x) {
return (x == x && x != null && x != x + 1);
}
// simulated sources for demos and tests
//
// Exported subs:
// bumpy_cpu: a 10-minute cpu metric with daily variation
// ripple_cpu: a 10-second cpu metric with minute-variation
//
// Exported functions:
//
// Exported reducers:
//
////////////////////////////////////////////////////////////////////////
//
// bumpy_cpu:
// a 10-minute cpu metric with noise and daily periodicity but no trend
//
function weekend_factor(time) {
if ((Date.get(time,'h') > 22 && Date.get(time, 'e') == 5)
|| Date.get(time, 'e') == 6
|| (Date.get(time,'h') < 6 && Date.get(time, 'e') == 1)) {
return 0.6 ;
} else if (Date.get(time, 'e') == 0) {
return 0.3 ;
} else {
return 1.0;
}
}
export sub bumpy_cpu(from, to) {
read -demo 'cdn' -nhosts 4 -dos .3 -dos_dur :15s: -daily 3
-from from -to to -every :10m:
host ~ 'sjc*' name = 'cpu'
| put cpu=*"value" * weekend_factor(time)
| keep cpu, time
}
//
// ripple_cpu:
// a 10-second cpu metric with noise and daily periodicity but no trend
//
export sub ripple_cpu(from, to) {
read -demo 'cdn' -nhosts 4 -daily 0
-ripple .5 -ripple_dur 60 -ripple_levels [-0.25, -0.25, 0, 0, 0, 0, 0.5]
-from from -to to -every :10s:
host ~ 'sjc*' name = 'cpu'
| put cpu=*"value"
| keep cpu, time
}
// example:
const start = :2014-01-01: ;
const dur = :1h: ;
bumpy_cpu -from :-2d: -to :now: | @timechart -title "2-day cpu variation"
;
ripple_cpu -from start -to start+dur
| @timechart -title "hourly cpu variation" -display.dataDensity 0
// model trends in a series via linear regression.
//
// Exported subs:
// fit: do a least-squares fit of a line to successive batches of points,
// and calculate the trend's value at each point. A fit is computed for each batch.
// ... | fit -in 'cpu' -every :2h: -over :8h: -t0 :2014-01-01: -out 'trend';
// fit_initial: do a least-squares fit of a line to an initial window of points,
// and calculate this trend's value at subsequent points. A fit is computed once.
// ... | fit_initial -in 'cpu' -over :2h: -t0 :2014-01-01: -out 'trend';
// fit_trailing: do a least-squares fit of a line to a moving window of points
// and calculate the trend value at each point. A fit is computed from each point.
// ... | fit_trailing -in 'cpu' -over :2h: -t0 :2014-01-01: -out 'trend';
// rate: robust rate of change estimation by fitting a trend to trailing data.
// ... | rate -in 'cpu' -dt :5m: -t0 :2014-01-01: -out 'rate' ;
// change: trend-based change over an interval. This is rate * dt and is in the same units as the input.
// ... | change -in 'cpu' -dt :5m: -t0 :2014-01-01: -out 'change' ;
// subtract: de-trend a series of points or successive batches of points.
// ... | subtract -in 'cpu' -every :2h: -over :8h: -t0 :2014-01-01: -out 'detrend_2h' ;
// subtract_trailing: de-trend a series of points by re-fitting at each point.
// ... | subtract_trailing -in 'cpu' -over :2h: -t0 :2014-01-01: -out 'detrend_2h' ;
//
// Exported reducer:
// fit_coeffs: linear regression on a timeseries
// return coefficients of least-squares fit of a line to a specified field on a batch of points.
// ... | reduce line = coeffs(y, t0)
// | put y0 = value(t0, coeffs), y1 = value(t1, coeffs)
// | ...
//////////////////////////////////////////////////////////////////////////////////////
import "sources.juttle" as sources;
import "math.juttle" as math;
//
// fit_coeffs: Linear regression vs time, for a batch or window of points.
// parameters:
// y -- field to fit
// t0 -- time origin for first fitted line
// returns vector [b, m, t0, n]
// b -- yintercept, m: slope (per second), n: number of points fit.
//
export reducer fit_coeffs(y, t0) {
var initial = true ;
var T, Y, T2, TY; // accumulate t, y, ... etc
var n;
function update() {
if (initial) {
T = Y = T2 = TY = n = 0;
initial = false;
}
var t = Duration.seconds(*"time" - t0) ;
T = T + t;
Y = Y + *y;
T2 = T2 + t * t;
TY = TY + t * *y;
n = n + 1;
}
function expire() {
var t = Duration.seconds(*"time" - t0) ;
T = T - t;
Y = Y - *y;
T2 = T2 - t * t;
TY = TY - t * *y;
n = n - 1;
}
function result() {
var inv = math.inv2x2(T2, T, T, n) ;
var m = inv[0] * TY + inv[1] * Y;
var b = inv[2] * TY + inv[3] * Y;
return [b, m, t0, n];
}
function reset() {
initial = true;
}
}
//
// value:
// evaluate the trendline represented by coeffs at the given time
//
export function value(t, coeffs, default) {
if (math.isFinite(coeffs[0]) && math.isFinite(coeffs[1])) {
return coeffs[0] + Duration.seconds(t - coeffs[2]) * coeffs[1];
} else {
return default;
}
}
//
// fit:
// construct a sequence of linear fits over epochs specified by -every@from.
// output each point's projection onto the fitted line covering its epoch.
// parameters:
// in -- input field
// every -- data window for fitting
// from -- time origin for first fitted line
// out -- output field for projected line's value on each point
//
export sub fit(in, every, from, out) {
(
reduce -every every -over every -on from __fit_tmp=fit_coeffs(in, from)
// XXX: we don't have a lag operator, so use pace to send the result back in
// time to beginning of its batch so it can be joined with the data it was fitted to.
| pace -from from -every :0s: ;
merge ;
)| join -onchange 2
| put *out = value(time, __fit_tmp, *in)
| remove __fit_tmp
}
//
// fit_initial:
// fit coefficients only once against the initial window,
// thereafter projecting against this same trend as time advances.
// For periodic data, this is a little dangerous if not run against an even
// multiple of the periodicity, because the extra partial-period data
// gives rise to a nonzero trend (phase-shift). But really, if you know you
// have periodic data, you shouldn't be fitting a trend!
//
// parameters:
// in -- input field
// initial -- duration of initial batch to fit
// from -- time origin for initial batch
// out -- output field for projected line's value on each point
//
export sub fit_initial(in, initial, from, out) {
(
reduce -every initial -over initial -on from -from from -to from+initial __fit_tmp=fit_coeffs(in, from)
// XXX: we don't have a lag operator, so use pace to send the result back in
// time to beginning of its batch so it can be joined with the data it was fitted to.
| pace -from from -every :0s: ;
merge ;
)| join -onchange 2
| put *out = value(time, __fit_tmp, *in)
| remove __fit_tmp
}
//
// fit_trailing:
// construct a linear fit over a trailing window of points
// and put its value on the incoming point. A new fit is computed at each point.
// parameters:
// in -- input field
// over -- trailing window to fit trend over
// from -- time origin for first fitted line
// out -- output field for projected line's value
//
export sub fit_trailing(in, over, from, out) {
put -over over __fit_tmp = fit_coeffs(in, from)
| put *out = value(time, __fit_tmp, *in)
| remove __fit_tmp
}
//
// subtract:
// de-trend intervals of points by subtracting their MSE linear fit.
// There will be a trend discontinuity at each refitting interval, so if
// this is a concern you should use subtract_trailing
// parameters:
// in -- field to de-trend
// every -- re-fitting interval
// over -- data window for fitting (>= every)
// from -- earliest time
// out -- output field for detrended value on each point
//
export sub subtract(in, every, over, from, out) {
fit -in in -every every -over over -from from -out '__subtract_tmp'
| put *out = *in - __subtract_tmp
| remove __subtract_tmp
}
//
// subtract_trailing:
// de-trend intervals of points by subtracting the MSE linear fit
// as each point arrives.
// parameters:
// in -- field to de-trend
// over -- data window for fitting
// from -- earliest time
// out -- output field for detrended value on each point
//
export sub subtract_trailing(in, over, from, out) {
fit_trailing -in in -over over -from from -out '__subtract_tmp'
| put *out = *in - __subtract_tmp
| remove __subtract_tmp
}
//
// rate:
// robustly estimate rate of change by fitting trendlines to a short moving
// window of points anchored at each successive point, and returning their slopes.
// parameters:
// in -- field to rate-estimate
// every -- time window for rate estimator (longer values give smoother derivatives)
// from -- earliest time
// out -- output field for rate (in/second) on each point
//
export sub rate(in, every, from, out) {
(
reduce -every every -over every -on from __rate_tmp=fit_coeffs(in, from)
// XXX: we don't have a lag operator, so use pace to send the result back in
// time to beginning of its batch so it can be joined with the data it was fitted to.
| pace -from from -every :0s: ;
merge ;
)| join -onchange 2
| put *out = math.isFinite(__rate_tmp[1]) ? __rate_tmp[1] : null
| remove __rate_tmp
}
//
// change:
// estimate the amount of change over an interval due to a linear trend over the interval.
// parameters:
// in -- field to rate-estimate
// every -- time window for rate estimator (longer values give smoother derivatives)
// from -- earliest time
// out -- output field for amount change in this interval due to linear trend.
//
export sub change (in, every, from, out) {
rate -in in -every every -from from -out out
| put *out = *out * Duration.seconds(every)
}
//
// rmse:
// root mean square error of a line fitted to an interval. This is an estimate of how nonlinear the
// input was over an interval.
// parameters:
// in -- field to estimate
// every -- time window for rmse estimator
// from -- earliest time
// out -- output field
//
export sub rmse(in, every, from, out) {
fit -in in -every every -from from -out '__rmse_tmp'
| put __rmse_se = (*in - __rmse_tmp) * (*in - __rmse_tmp)
| reduce -every every __rmse_sse = sum(__rmse_se)
| put *out = Math.sqrt(__rmse_sse)
| remove __rmse_tmp, __rmse_se, __rmse_sse
}
//
// demo: long trends
// fit trendlines to successive batches of points in various ways,
// over long intervals, for de-trending prior to de-seasonalizing and
// anomaly detection. superimpose them on the original data.
//
const start = :2014-01-01: ;
const dur = :1w: ;
const every = :12h: ;
const over = :3d: ;
sources.bumpy_cpu -from start -to start + dur
| (
// fit a trend to an initial window of data, and apply it to all data going forward.
fit_initial -in 'cpu' -initial over -from start -out 'initial_trend'
| put detrend_initial = cpu - initial_trend ;
// fit a trend to a moving window of data anchored at the
// current point, updating it for each new point.
fit_trailing -in 'cpu' -over over -from start -out 'trend_trailing'
| put detrend_trailing = cpu - trend_trailing ;
)
| split
| @timechart -title "weekly trend" -display.dataDensity 0
;
//
// demo: use change to highlight rate-of-change anomalies
//
sources.ripple_cpu -from start -to start + :1h:
| change -in 'cpu' -every :60s: -from start -out 'rate'
| split
| @timechart -title "60s rate of change" -display.dataDensity 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment