|
// model trends in a series via linear regression. |
|
// |
|
// Exported subs: |
|
// fit: do a least-squares fit of a line to successive batches of points, |
|
// and calculate the trend's value at each point. A fit is computed for each batch. |
|
// ... | fit -in 'cpu' -every :2h: -over :8h: -t0 :2014-01-01: -out 'trend'; |
|
// fit_initial: do a least-squares fit of a line to an initial window of points, |
|
// and calculate this trend's value at subsequent points. A fit is computed once. |
|
// ... | fit_initial -in 'cpu' -over :2h: -t0 :2014-01-01: -out 'trend'; |
|
// fit_trailing: do a least-squares fit of a line to a moving window of points |
|
// and calculate the trend value at each point. A fit is computed from each point. |
|
// ... | fit_trailing -in 'cpu' -over :2h: -t0 :2014-01-01: -out 'trend'; |
|
// rate: robust rate of change estimation by fitting a trend to trailing data. |
|
// ... | rate -in 'cpu' -dt :5m: -t0 :2014-01-01: -out 'rate' ; |
|
// change: trend-based change over an interval. This is rate * dt and is in the same units as the input. |
|
// ... | change -in 'cpu' -dt :5m: -t0 :2014-01-01: -out 'change' ; |
|
// subtract: de-trend a series of points or successive batches of points. |
|
// ... | subtract -in 'cpu' -every :2h: -over :8h: -t0 :2014-01-01: -out 'detrend_2h' ; |
|
// subtract_trailing: de-trend a series of points by re-fitting at each point. |
|
// ... | subtract_trailing -in 'cpu' -over :2h: -t0 :2014-01-01: -out 'detrend_2h' ; |
|
// |
|
// Exported reducer: |
|
// fit_coeffs: linear regression on a timeseries |
|
// return coefficients of least-squares fit of a line to a specified field on a batch of points. |
|
// ... | reduce line = coeffs(y, t0) |
|
// | put y0 = value(t0, coeffs), y1 = value(t1, coeffs) |
|
// | ... |
|
////////////////////////////////////////////////////////////////////////////////////// |
|
import "sources.juttle" as sources; |
|
import "math.juttle" as math; |
|
// |
|
// fit_coeffs: Linear regression vs time, for a batch or window of points. |
|
// parameters: |
|
// y -- field to fit |
|
// t0 -- time origin for first fitted line |
|
// returns vector [b, m, t0, n] |
|
// b -- yintercept, m: slope (per second), n: number of points fit. |
|
// |
|
export reducer fit_coeffs(y, t0) { |
|
var initial = true ; |
|
var T, Y, T2, TY; // accumulate t, y, ... etc |
|
var n; |
|
|
|
function update() { |
|
if (initial) { |
|
T = Y = T2 = TY = n = 0; |
|
initial = false; |
|
} |
|
var t = Duration.seconds(*"time" - t0) ; |
|
T = T + t; |
|
Y = Y + *y; |
|
T2 = T2 + t * t; |
|
TY = TY + t * *y; |
|
n = n + 1; |
|
} |
|
function expire() { |
|
var t = Duration.seconds(*"time" - t0) ; |
|
T = T - t; |
|
Y = Y - *y; |
|
T2 = T2 - t * t; |
|
TY = TY - t * *y; |
|
n = n - 1; |
|
} |
|
function result() { |
|
var inv = math.inv2x2(T2, T, T, n) ; |
|
var m = inv[0] * TY + inv[1] * Y; |
|
var b = inv[2] * TY + inv[3] * Y; |
|
return [b, m, t0, n]; |
|
} |
|
function reset() { |
|
initial = true; |
|
} |
|
} |
|
// |
|
// value: |
|
// evaluate the trendline represented by coeffs at the given time |
|
// |
|
export function value(t, coeffs, default) { |
|
if (math.isFinite(coeffs[0]) && math.isFinite(coeffs[1])) { |
|
return coeffs[0] + Duration.seconds(t - coeffs[2]) * coeffs[1]; |
|
} else { |
|
return default; |
|
} |
|
} |
|
// |
|
// fit: |
|
// construct a sequence of linear fits over epochs specified by -every@from. |
|
// output each point's projection onto the fitted line covering its epoch. |
|
// parameters: |
|
// in -- input field |
|
// every -- data window for fitting |
|
// from -- time origin for first fitted line |
|
// out -- output field for projected line's value on each point |
|
// |
|
export sub fit(in, every, from, out) { |
|
( |
|
reduce -every every -over every -on from __fit_tmp=fit_coeffs(in, from) |
|
// XXX: we don't have a lag operator, so use pace to send the result back in |
|
// time to beginning of its batch so it can be joined with the data it was fitted to. |
|
| pace -from from -every :0s: ; |
|
merge ; |
|
)| join -onchange 2 |
|
| put *out = value(time, __fit_tmp, *in) |
|
| remove __fit_tmp |
|
} |
|
// |
|
// fit_initial: |
|
// fit coefficients only once against the initial window, |
|
// thereafter projecting against this same trend as time advances. |
|
// For periodic data, this is a little dangerous if not run against an even |
|
// multiple of the periodicity, because the extra partial-period data |
|
// gives rise to a nonzero trend (phase-shift). But really, if you know you |
|
// have periodic data, you shouldn't be fitting a trend! |
|
// |
|
// parameters: |
|
// in -- input field |
|
// initial -- duration of initial batch to fit |
|
// from -- time origin for initial batch |
|
// out -- output field for projected line's value on each point |
|
// |
|
export sub fit_initial(in, initial, from, out) { |
|
( |
|
reduce -every initial -over initial -on from -from from -to from+initial __fit_tmp=fit_coeffs(in, from) |
|
// XXX: we don't have a lag operator, so use pace to send the result back in |
|
// time to beginning of its batch so it can be joined with the data it was fitted to. |
|
| pace -from from -every :0s: ; |
|
merge ; |
|
)| join -onchange 2 |
|
| put *out = value(time, __fit_tmp, *in) |
|
| remove __fit_tmp |
|
} |
|
// |
|
// fit_trailing: |
|
// construct a linear fit over a trailing window of points |
|
// and put its value on the incoming point. A new fit is computed at each point. |
|
// parameters: |
|
// in -- input field |
|
// over -- trailing window to fit trend over |
|
// from -- time origin for first fitted line |
|
// out -- output field for projected line's value |
|
// |
|
export sub fit_trailing(in, over, from, out) { |
|
put -over over __fit_tmp = fit_coeffs(in, from) |
|
| put *out = value(time, __fit_tmp, *in) |
|
| remove __fit_tmp |
|
} |
|
// |
|
// subtract: |
|
// de-trend intervals of points by subtracting their MSE linear fit. |
|
// There will be a trend discontinuity at each refitting interval, so if |
|
// this is a concern you should use subtract_trailing |
|
// parameters: |
|
// in -- field to de-trend |
|
// every -- re-fitting interval |
|
// over -- data window for fitting (>= every) |
|
// from -- earliest time |
|
// out -- output field for detrended value on each point |
|
// |
|
export sub subtract(in, every, over, from, out) { |
|
fit -in in -every every -over over -from from -out '__subtract_tmp' |
|
| put *out = *in - __subtract_tmp |
|
| remove __subtract_tmp |
|
} |
|
// |
|
// subtract_trailing: |
|
// de-trend intervals of points by subtracting the MSE linear fit |
|
// as each point arrives. |
|
// parameters: |
|
// in -- field to de-trend |
|
// over -- data window for fitting |
|
// from -- earliest time |
|
// out -- output field for detrended value on each point |
|
// |
|
export sub subtract_trailing(in, over, from, out) { |
|
fit_trailing -in in -over over -from from -out '__subtract_tmp' |
|
| put *out = *in - __subtract_tmp |
|
| remove __subtract_tmp |
|
} |
|
// |
|
// rate: |
|
// robustly estimate rate of change by fitting trendlines to a short moving |
|
// window of points anchored at each successive point, and returning their slopes. |
|
// parameters: |
|
// in -- field to rate-estimate |
|
// every -- time window for rate estimator (longer values give smoother derivatives) |
|
// from -- earliest time |
|
// out -- output field for rate (in/second) on each point |
|
// |
|
export sub rate(in, every, from, out) { |
|
( |
|
reduce -every every -over every -on from __rate_tmp=fit_coeffs(in, from) |
|
// XXX: we don't have a lag operator, so use pace to send the result back in |
|
// time to beginning of its batch so it can be joined with the data it was fitted to. |
|
| pace -from from -every :0s: ; |
|
merge ; |
|
)| join -onchange 2 |
|
| put *out = math.isFinite(__rate_tmp[1]) ? __rate_tmp[1] : null |
|
| remove __rate_tmp |
|
} |
|
// |
|
// change: |
|
// estimate the amount of change over an interval due to a linear trend over the interval. |
|
// parameters: |
|
// in -- field to rate-estimate |
|
// every -- time window for rate estimator (longer values give smoother derivatives) |
|
// from -- earliest time |
|
// out -- output field for amount change in this interval due to linear trend. |
|
// |
|
export sub change (in, every, from, out) { |
|
rate -in in -every every -from from -out out |
|
| put *out = *out * Duration.seconds(every) |
|
} |
|
// |
|
// rmse: |
|
// root mean square error of a line fitted to an interval. This is an estimate of how nonlinear the |
|
// input was over an interval. |
|
// parameters: |
|
// in -- field to estimate |
|
// every -- time window for rmse estimator |
|
// from -- earliest time |
|
// out -- output field |
|
// |
|
export sub rmse(in, every, from, out) { |
|
fit -in in -every every -from from -out '__rmse_tmp' |
|
| put __rmse_se = (*in - __rmse_tmp) * (*in - __rmse_tmp) |
|
| reduce -every every __rmse_sse = sum(__rmse_se) |
|
| put *out = Math.sqrt(__rmse_sse) |
|
| remove __rmse_tmp, __rmse_se, __rmse_sse |
|
} |
|
// |
|
// demo: long trends |
|
// fit trendlines to successive batches of points in various ways, |
|
// over long intervals, for de-trending prior to de-seasonalizing and |
|
// anomaly detection. superimpose them on the original data. |
|
// |
|
const start = :2014-01-01: ; |
|
const dur = :1w: ; |
|
const every = :12h: ; |
|
const over = :3d: ; |
|
|
|
sources.bumpy_cpu -from start -to start + dur |
|
| ( |
|
// fit a trend to an initial window of data, and apply it to all data going forward. |
|
fit_initial -in 'cpu' -initial over -from start -out 'initial_trend' |
|
| put detrend_initial = cpu - initial_trend ; |
|
// fit a trend to a moving window of data anchored at the |
|
// current point, updating it for each new point. |
|
fit_trailing -in 'cpu' -over over -from start -out 'trend_trailing' |
|
| put detrend_trailing = cpu - trend_trailing ; |
|
) |
|
| split |
|
| @timechart -title "weekly trend" -display.dataDensity 0 |
|
; |
|
// |
|
// demo: use change to highlight rate-of-change anomalies |
|
// |
|
sources.ripple_cpu -from start -to start + :1h: |
|
| change -in 'cpu' -every :60s: -from start -out 'rate' |
|
| split |
|
| @timechart -title "60s rate of change" -display.dataDensity 0 |