Given a changefeed of the form:
stream.reduce(f).changes({includeInitial: <II>, includeStates: <IS>})
Assume that for the given f
, we know the following properties:
<f_BASE>
the initial accumulator forf
<f_APPLY>
a function from the accumulator and an element in the input table to a new accumulator<f_UNAPPLY>
the inverse of<f_APPLY>
in the accumulator<f_EMIT>
generates a result value of the reduction from the current accumulator
Now the query can be rewritten into:
stream.changes({includeInitial: true, includeStates: true}).fold(
{f_acc: <f_BASE>, is_initialized: false},
function(acc, el) {
var f_acc = acc('f_acc');
var new_f_acc = r.branch(el.hasFields("old_val"), <f_UNAPPLY>(f_acc, el('old_val')), f_acc).do(function(un_f_acc) {
return r.branch(el.hasFields("new_val"), <f_APPLY>(un_f_acc, el('new_val')), un_f_acc);
});
var new_is_initialized = acc('is_initialized').or(el.hasFields('state').and(el('state').eq('ready')));
return {f_acc: new_f_acc, is_initialized: new_is_initialized};
},
{emit: function(old_acc, el, new_acc) {
var old_f_acc = old_acc('f_acc');
var new_f_acc = new_acc('f_acc');
var old_val = f_EMIT(old_f_acc);
var new_val = f_EMIT(new_f_acc);
// We handle the 'ready' state separately below
var emit_state = r.expr(IS).and(el.hasFields('state')).and(r.expr(II).not().or(el('state').ne('ready')));
var emit_update = old_acc('is_initialized').and(old_val.ne(new_val));
var emit_initial = r.expr(<II>).and(old_acc('is_initialized').not().and(new_acc('is_initialized')));
return r.branch(
emit_state, [el],
emit_update, [{'old_val': old_val, 'new_val': new_val}],
emit_initial, r.branch(<IS>, [{'new_val': new_val}, {state: "ready"}], [{'new_val': new_val}]),
[]
);
}})
For example for count()
:
<f_BASE> = 0
<f_APPLY> = function(acc, el) { return acc.add(1); }
<f_UNAPPLY> = function(acc, el) { return acc.sub(1); }
<f_EMIT> = function(acc) { return acc; }
Or for avg()
:
<f_BASE> = {c: 0, sum: 0}
<f_APPLY> = function(acc, el) { return {c: acc('c').add(1), sum: acc('sum').add(el) }; }
<f_UNAPPLY> = function(acc, el) { return {c: acc('c').sub(1), sum: acc('sum').sub(el) }; }
<f_EMIT> = function(acc) { return acc('sum').div(acc('c')); }
(plus some sort of handling for empty input sets that we need to come up with)