Last active
October 20, 2015 21:13
-
-
Save welch/0c2600f05fbff773591f to your computer and use it in GitHub Desktop.
predict: return the unsurprising portion of a metric stream by estimating trend and seasonal effects
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Predict: | |
// Trend, seasonality, and noise estimation for metric streams. | |
// | |
// Predict is a sub which consumes a metric stream and emits a | |
// prediction stream based on estimated trend, seasonality, and a | |
// smoothed version of the input. Prediction errors (the difference | |
// between predict's output and the input) can signal anomalies in the | |
// stream. | |
// | |
// usage: predict [options] | |
// options: | |
// -field ("value") Name of metric field to predict | |
// -over (:w:) Period: the length of one repeating cycle in the input metric. | |
// -every (default is based on period) interval between emitted prediction points | |
// -pct (0.5) percentile to retain during initial reduction to every-spaced values | |
// -nonneg (true) do not allow predictions to become negative. | |
// -detrend (true) if false, do not remove estimated trend | |
// -deseason (true) if false, do not remove estimated cyclic effect | |
// -denoise (true) if false, do not smooth the detrended/deseasoned value | |
// | |
// predict is intended to be used with a historic or superquery that | |
// includes enough history to initialize its estimators. predict can | |
// begin emitting prediction points almost immediately, though the | |
// early points will simply be de-meaned. After 2 periods have gone by, | |
// trend and seasonality (at the resolution of -every) will switch on, | |
// and after 3 periods have passed all estimators have full windows of | |
// data. | |
// | |
// output: | |
// each output point contains the following fields: | |
// *field : average value of input field over -every | |
// T : portion of field predicted by the trend estimator | |
// S : portion of field predicted by seasonality | |
// Y : portion of field predicted by smoothing | |
// P : predicted value of field, T + S + Y | |
// E : prediction error, P - field | |
// Z : normalized error based on the sample stddev over trailing periods | |
// | |
// Return the sample Z-score of the specified field. | |
// | |
// Parameters: | |
// | |
// * field: fieldname | |
// | |
// compute an approximate Z-score for an input value, using sample | |
// estimates of population mean and stddev: | |
// (value - sample mean) / sample stddev | |
// | |
reducer z(field) { | |
var sum = 0; | |
var ssum = 0; | |
var n = 0; | |
var latest = null; | |
function update() { | |
var v = *field; | |
if (v != null) { | |
latest = v; | |
sum = sum + v; | |
ssum = ssum + v*v; | |
n = n + 1; | |
} | |
} | |
function result() { | |
if (n < 2 ) { | |
return null; | |
} else { | |
var mean = sum / n; | |
var stddev = Math.sqrt(1/(n-1) * (ssum - sum*sum/n)); | |
return (latest - mean) / stddev; | |
} | |
} | |
function expire() { | |
var v = *field; | |
if (v != null) { | |
sum = sum - v; | |
ssum = ssum - v*v; | |
n = n - 1; | |
} | |
} | |
}; | |
// Exponential weighted moving average | |
// | |
// Parameters: | |
// | |
// * field: fieldname | |
// * alpha: weight of newest value in moving average | |
// * initial: initial value of the moving average, defaults to first update | |
// * offset: name of field containing an offset to be applied to the stored value | |
// | |
reducer ewma(field, alpha, initial=null, offset=null) { | |
var value = initial; | |
function update() { | |
if (*field != null) { | |
if (value == null) { | |
value = *field; | |
} | |
value = value + alpha * (*field - value); | |
if (offset != null && *offset != null) { | |
value = value - *offset; | |
} | |
} | |
} | |
function result() { | |
return value; | |
} | |
} | |
// Return the Nth lagged value of a field | |
// | |
// Parameters: | |
// | |
// * field: name of value field | |
// * N: (1) lag | |
// | |
reducer L(field, N=1, undef=null) { | |
var values = []; | |
var n = 0, i = 0; | |
var current = null; | |
function update() { | |
if (*field != null) { | |
values[i] = *field; | |
i = (i + 1) % (N + 1); | |
if (n <= N) { | |
n = n + 1; | |
} | |
} | |
} | |
function result() { | |
if (n == N + 1) { | |
return values[i]; | |
} else { | |
return undef; | |
} | |
} | |
} | |
// Return a nice subdividing duration, preserving calendarness where possible | |
// | |
// Parameters: | |
// | |
// * d: duration to subdivide | |
// | |
export function subdivide_duration(d) { | |
if (Duration.as(d, "y") == Math.floor(Duration.as(d, "y"))) { | |
return :M:; | |
} else if (Duration.as(d, "M") == Math.floor(Duration.as(d, "M"))) { | |
return :d:; | |
} else if (Duration.as(d, "w") == Math.floor(Duration.as(d, "w"))) { | |
return :6h:; | |
} else if (Duration.as(d, "d") == Math.floor(Duration.as(d, "d"))) { | |
return :h:; | |
} else if (Duration.as(d, "h") == Math.floor(Duration.as(d, "h"))) { | |
return :5m:; | |
} else { | |
return d / 30; | |
} | |
} | |
// estimate trailing trend and median as the median | |
// duration-over-duration change of all samples in a window of -over | |
// duration (a variant of the Theil-Sen estimator). This can use up to | |
// 2 periods of historic data per point, but begins outputting a | |
// windowed median immediately. trend expects its input to already be | |
// aggregated -every. | |
// | |
// Parameters: | |
// | |
// * field: Name of metric field to predict | |
// * over: Period: length of one repeating cycle in the input metric. | |
// * every: interval between emitted prediction points | |
// * revise: if true, include the current value in the slope estimate. | |
// | |
// Output: | |
// | |
// * T: portion of field predicted by trend | |
// * M: slope (change per over), or null if no slope was estimated | |
// (T will have a trailing median value if M is null) | |
// | |
sub trend(field, over, every, revise=true) { | |
const N = Math.round(over / every); | |
put __bucket = count() % N | |
| put __per_over = delta(field,null) by __bucket // same bucket, successive periods | |
| put -over 2 * over | |
__t0 = first(time) + (last(time) - first(time)) / 2, | |
__y0 = percentile(field,0.5), | |
M = __per_over != null ? percentile(__per_over,0.5) : null | |
| put M = revise ? M : L("M") ?? M // use current or previous M/y0/t0 to predict trend at time | |
| put __y0 = revise ? __y0 : L("__y0") ?? __y0 | |
| put __t0 = revise ? __t0 : L("__t0") ?? __t0 | |
| put T = (M != null) ? __y0 + M * (time - __t0) / over : __y0 | |
| remove __t0, __y0, __bucket, __per_over | |
} | |
// estimate cyclic variation as a moving median of the values over | |
// successive epochs at a given phase. This can use up to 3 periods | |
// of historic data per point, and at startup simply replays the most | |
// recent value. seasonal expects its input to already be de-trended | |
// and aggregated -every. | |
// | |
// Parameters: | |
// | |
// * field: Name of metric field to predict | |
// * over: Period: length of one repeating cycle in the input metric. | |
// * every: interval between emitted prediction points | |
// * revise: if true, include the current value in the seasonal estimate. | |
// | |
// Output: | |
// | |
// * S: portion of field predicted by seasonality | |
// | |
sub seasonal(field, over, every, revise) { | |
const N = Math.round(over / every); | |
put __bucket = count() % N, __n=count() | |
| put -over 3 * over S = (__n > 4) ? percentile(field,0.5) : null by __bucket | |
| put S = revise ? S : L("S") by __bucket // use current or previous seasonal at time | |
| remove __bucket, __n | |
} | |
// windowed median as a less-noisy prediction of mean. the optional | |
// offset field is examined for a nonzero impulse to be added to the | |
// windowed values state. This is allows incorporation of a | |
// discontinuity in the series. | |
// | |
// Parameters: | |
// | |
// * field: Name of metric field to predict | |
// * every: interval between emitted prediction points | |
// * offset: fieldname for an impulse to be added | |
// * revise: if false, return previous value as the prediction | |
// | |
// Output: | |
// | |
// * Y : portion of field predicted by smoothing | |
// | |
sub level(field, every, offset, revise) { | |
put -over 3 * every Y = percentile(field, 0.5) | |
| put Y = revise ? Y : (L("Y") ?? Y) | |
| put -over 3 * every Y = Y + (sum(offset) ?? 0) | |
} | |
// predict the mean of the distribution of next field value at the | |
// current time from prior values (revise=false), optionally including | |
// the current field value in the trend and seasonal estimates | |
// (revise=true). This can use up to 3 periods of historic data per | |
// point, but can begin producing (noisy) point-to-point estimates | |
// after a few points. | |
// | |
// Parameters: | |
// | |
// * field: ("value") Name of metric field to predict | |
// * over: (:w:) Period: length of one repeating cycle in the input metric. | |
// * every: (default is based on value of over) interval between emitted prediction points | |
// * pct: (0.5) percentile to retain during initial reduction to every-spaced values | |
// * nonneg: (true) do not allow predictions to become negative. | |
// * detrend: (true) if false, do not remove estimated trend | |
// * deseason: (true) if false, do not remove estimated cyclic effect | |
// * denoise: (true) if false, do not smooth the detrended/deseasoned value | |
// * revise: (true) if false, do not include current value in trend and seasonal estimates | |
// | |
// Output: | |
// | |
// predict consumes its input stream, and outputs points every -every. | |
// each output point contains the following fields: | |
// * *field: percentile value of input field over -every, from -pct (default median) | |
// * T: portion of field predicted by the trend estimator | |
// * S: portion of field predicted by seasonality | |
// * Y: portion of field predicted by smoothing | |
// * P: predicted value of field, T + S + Y | |
// * E: prediction error, P - field | |
// * Z: normalized error based on the sample stddev over trailing periods | |
// | |
export sub predict(field = "value", over = :w:, every=null, pct=0.5, nonneg=true, | |
detrend=true, deseason=true, denoise=true, revise=true) { | |
const __every = every ?? subdivide_duration(over); | |
reduce -every __every time= Date.quantize(first(time), __every), *field = percentile(field, pct) | |
| trend -field field -every __every -over over -revise revise | |
| put T = detrend ? T : 0 | |
| put __detrend = *field - T | |
| put __LT = L("T") ?? 0 // before M becomes valid, T is windowed median. | |
| put __offset = ((M != null && L("M") == null) ? __LT - T : 0) | |
| seasonal -field '__detrend' -over over -every __every -revise revise | |
| put S = deseason ? S : 0 | |
| put __offset = __offset - ((S != null && L("S") == null) ? S : 0) | |
| put __deseason = __detrend - (S ?? 0) | |
| level -field '__deseason' -every __every -revise revise -offset '__offset' | |
| put Y = denoise ? Y : (L("__deseason") ?? 0) + __offset | |
| put P = (S ?? 0) + (T ?? 0) + Y | |
| put P = nonneg ? Math.max(P, 0) : P | |
| put E = P - *field | |
| put -over 3*over Z = z('E') | |
| remove __LT, __detrend, __deseason, __offset, M | |
} | |
////////////////////////////////////////////////////////////////////////////// | |
// predict demos: trend and seasonality | |
// | |
// the following metric sources all work with the default predict interval of :w: | |
// all reads are from jx3 or j4qa as indicated. | |
// | |
input last: duration -default :30 days: -label 'Show this duration:'; | |
export sub live_juttle_runs(dur1=:3M:) { | |
read -from (:now: + :1d: - dur1) -space 'prod' | |
event = 'run-juttle' AND | |
properties_page_currentPage ='https://app.jut.io/#explorer' AND | |
context_ip != '207.141.12.50' | |
| reduce -every :h: run_count = count() | |
| (@timechart -title "Weekly App Juttle Program Runs" -valueField 'run_count' -display.dataDensity 0; put value=run_count) | |
} | |
export sub live_pageviews() { | |
read -from :now: - last -space 'prod' type = 'page' properties_url ~ 'http://docs.jut.io/*' | |
NOT (context_ip in ['207.141.12.50','54.149.158.203','54.68.207.93','52.10.135.75','54.148.36.156','52.24.101.163']) | |
| reduce -every :h: pageviews = count(), users = count_unique(anonymousId) | |
| (@timechart -title "live pageviews" -valueField 'pageviews' -display.dataDensity 0; put value=pageviews) | |
} | |
export sub live_users() { | |
read -from :now: - last -space 'prod' type = 'page' properties_url ~ 'http://docs.jut.io/*' | |
NOT (context_ip in ['207.141.12.50','54.149.158.203','54.68.207.93','52.10.135.75','54.148.36.156','52.24.101.163']) | |
| reduce -every :h: pageviews = count(), users = count_unique(anonymousId) | |
| (@timechart -title "live users" -valueField 'users' -display.dataDensity 0; put value=users) | |
} | |
export sub slack_messages() { | |
// this runs in j4qa rather than jx3 | |
read -from :-M: -space "slack" | |
type = "message" and user_name != null | |
| reduce -every :h: name="slack_messages", value = count() | |
| (@timechart -title "slack messages" -display.dataDensity 0; pass) | |
} | |
sub pulse(field, value, start, dur) { | |
put *field = *field + ((time >= start && time < start + dur) ? value : 0) | |
} | |
sub slide(field, value, start, dur) { | |
put *field = *field + ( | |
(time >= start && time < start + dur) | |
? ((time - start) / dur) * value | |
: 0) | |
} | |
export sub series(from, to, every, over, trend = 1, season = 10, alpha = 0.5, sigma=0.1, initial=0) { | |
// generate a series of values having given trend, seasonality, noise, and autocorrelation (ewma alpha) | |
emit -from from -to to -every every | |
| put n = count(), dy = (time - Date.new(0)) / over, cycle = Math.sin(dy * 2 * Math.PI) | |
| put step = sigma * (2 * Math.random() - 1) | |
| put step = ewma("step", alpha, initial) | |
| put value = n * trend + season * cycle + step | |
| slide -field 'value' -value -20 -start from + 5.25 * over -dur 4*every | |
} | |
export sub posSeries(from, to, every, over, trend = 1, season = 10, alpha = 0.5, sigma=0.1, initial=0) { | |
// like series, but keep it positive | |
emit -from from -to to -every every | |
| put n = count(), dy = (time - Date.new(0)) / over, cycle = Math.abs(Math.sin(dy * 2 * Math.PI)) | |
| put step = sigma * (2 * Math.random() - 1) | |
| put step = ewma("step", alpha, initial) | |
| put value = n * trend + season * cycle + step | |
| slide -field 'value' -value -20 -start from + 5.25 * over -dur 4*every | |
| put value = Math.abs(value) | |
} | |
// null invocation of predict that uses the previous value as the | |
// prediction of the next value. not bad. | |
// | |
export sub dontPredict(field = "value", over = :w:, every=null, pct=0.5, nonneg=true) { | |
predict -field field -over over -every every -pct pct -nonneg nonneg | |
-detrend false -deseason false -denoise false -revise false | |
} | |
const over = :w:; | |
const every = subdivide_duration(over); | |
// uncomment any one of the following sources: | |
posSeries -from Date.new(0) -to Date.new(0)+ 8*over -every every -over over -trend .05 -season 20 -alpha 0.5 -sigma 4//live_juttle_runs | |
//live_pageviews | |
//live_users | |
//slack_messages | |
// | dontPredict | |
| predict | |
| ( | |
put name="stderr", value = sigma("E") | @tile; | |
split value, P, T, S, Y | @timechart -keyField 'name' -display.dataDensity 0; | |
split E | @timechart -keyField 'name' -display.dataDensity 0; | |
split Z | @timechart -keyField 'name' -display.dataDensity 0; | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment