-
-
Save xgrommx/6e86ce77df23db2578223bae4f9b1dfb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function Signal(sub) { | |
this.sub = sub; | |
} | |
Signal.prototype = { | |
/** | |
* Applies the given function over each event. | |
*/ | |
map: function(fn) { | |
var sub = this.sub; | |
return new Signal(function(publish, report, complete) { | |
return sub(function(event) { | |
publish(fn(event)); | |
}, report, complete); | |
}); | |
}, | |
/** | |
* Applies the given function to each event. | |
* For each event that succeeds, it is emitted. | |
*/ | |
filter: function(fn) { | |
var sub = this.sub; | |
return new Signal(function(publish, report, complete) { | |
return sub(function(event) { | |
if (fn(event)) { | |
publish(event); | |
} | |
}, report, complete); | |
}); | |
}, | |
/** | |
* Merges two signals. | |
*/ | |
merge: function(that) { | |
var subThat = that.sub; | |
var subThis = this.sub; | |
return new Signal(function(publish, report, complete) { | |
var thatCompleted = false; | |
var thisCompleted = false; | |
function completeThat() { | |
thatCompleted = true; | |
if (thisCompleted) { | |
complete(); | |
} | |
} | |
function completeThis() { | |
thisCompleted = true; | |
if (thatCompleted) { | |
complete(); | |
} | |
} | |
var cancelThat = subThat(publish, report, completeThat); | |
var cancelThis = subThis(publish, report, compelteThis); | |
return function() { | |
cancelThat(); | |
cancelThis(); | |
}; | |
}); | |
}, | |
/** | |
* Parallelizes a signal of signals into | |
* one signal by merging each event. | |
*/ | |
parallelize: function() { | |
var sub = this.sub; | |
return new Signal(function(publish, report, complete) { | |
var completed = false; | |
var uncompleted = 0; | |
return sub(function(event) { | |
// This code will fail horribly if | |
// the completion callback is called | |
// more than once. Good luck! | |
uncompleted++; | |
event.sub(publish, report, function() { | |
uncompleted--; | |
if (uncompleted === 0 && completed) { | |
complete(); | |
} | |
}); | |
}, report, function() { | |
// If there are no more uncompleted children, | |
// complete the signal, otherwise mark the | |
// parent as completed | |
if (uncompleted === 0) { | |
complete(); | |
} else { | |
completed = true; | |
} | |
}); | |
}); | |
}, | |
/** | |
* Applies the given function over each | |
* event and merges the resulting | |
* signals. | |
* | |
* `signal.parallel(fn)` is equivalent | |
* to `signal.map(fn).parallelize()`. | |
*/ | |
parallel: function(fn) { | |
return this.map(fn).parallelize(); | |
}, | |
/** | |
* Concatenates two signals. | |
*/ | |
concat: function(that) { | |
var subThat = that.sub; | |
var subThis = this.sub; | |
return new Signal(function(publish, report, complete) { | |
var cancel = subThis(publish, report, function() { | |
cancel = subThat(publish, report, complete); | |
}); | |
return function() { | |
cancel(); | |
}; | |
}); | |
}, | |
/** | |
* Flattens a signal of signals into one | |
* single by concatenating each event. | |
*/ | |
flatten: function() { | |
var sub = this.sub; | |
return new Signal(function(publish, report, complete) { | |
var completed = false; | |
var queue = []; | |
return sub(function(event) { | |
if (queue.length === 0) { | |
event.sub(publish, report, function next() { | |
// If there are children left in the queue, | |
// subscribe to the next one with the same | |
// complete callback, otherwise if the parent | |
// is completed and there are no children | |
// waiting, complete the signal. | |
if (queue.length !== 0) { | |
queue.shift().sub(publish, report, next); | |
} else if (completed) { | |
complete(); | |
} | |
}); | |
} else { | |
queue.push(event); | |
} | |
}, report, function() { | |
// If there are still children in the queue, | |
// mark the parent as completed | |
if (queue.length === 0) { | |
complete(); | |
} else { | |
completed = true; | |
} | |
}); | |
}); | |
}, | |
/** | |
* Applies the given function over each | |
* event and concatenates the resulting | |
* signals. | |
* | |
* `signal.chain(fn)` is equivalent | |
* to `signal.map(fn).flatten()`. | |
*/ | |
chain: function(fn) { | |
return this.map(fn).flatten(); | |
}, | |
/** | |
* Like `parallelize` except cancels | |
* each signal before subscribing to | |
* the next. | |
*/ | |
focus: function() { | |
var sub = this.sub; | |
return new Signal(function(publish, report, complete) { | |
var cancel = function() {}; | |
return sub(function(event) { | |
cancel(); | |
cancel = event.sub(publish, report, function() {}); | |
}, report, function() { | |
cancel(); | |
complete(); | |
}); | |
}); | |
}, | |
/** | |
* Like `parallel` except uses `focus` | |
* for signal joining. | |
*/ | |
expand: function(fn) { | |
return this.map(fn).focus(); | |
}, | |
/** | |
* Zips events from two signals into one. | |
* Uses queue system to keep events in | |
* order. | |
*/ | |
zip: function(that, fn) { | |
var subThat = that.sub; | |
var subThis = this.sub; | |
return new Signal(function(publish, report, complete) { | |
var thatCompleted = false; | |
var thisCompleted = false; | |
function completeThat() { | |
thatCompleted = true; | |
if (thisCompleted) { | |
complete(); | |
} | |
} | |
function completeThis() { | |
thisCompleted = true; | |
if (thatCompleted) { | |
complete(); | |
} | |
} | |
var thatQueue = []; | |
var thisQueue = []; | |
var cancelThat = subThat(function(event) { | |
if (thisQueue.length === 0) { | |
thatQueue.push(event); | |
} else { | |
publish(fn(thisQueue.shift(), event)); | |
} | |
}, report, completeThat); | |
var cancelThis = subThat(function(event) { | |
if (thatQueue.length === 0) { | |
thisQueue.push(event); | |
} else { | |
publish(fn(event, thatQueue.shift())); | |
} | |
}, report, completeThis); | |
return function() { | |
cancelThat(); | |
cancelThis(); | |
}; | |
}); | |
}, | |
/** | |
* Like `zip` except only applies latest | |
* known events from each signal. | |
*/ | |
combine: function(that, fn) { | |
var subThat = that.sub; | |
var subThis = this.sub; | |
return new Signal(function(publish, report, complete) { | |
var thatCompleted = false; | |
var thisCompleted = false; | |
function completeThat() { | |
thatCompleted = true; | |
if (thisCompleted) { | |
complete(); | |
} | |
} | |
function completeThis() { | |
thisCompleted = true; | |
if (thatCompleted) { | |
complete(); | |
} | |
} | |
var ref = {}; | |
var latestThat = ref; | |
var latestThis = ref; | |
var cancelThat = subThat(function(event) { | |
latestThat = event; | |
if (latestThis !== ref) { | |
publish(fn(latestThis, latestThat)); | |
} | |
}, report, completeThat); | |
var cancelThis = subThat(function(event) { | |
latestThis = event; | |
if (lastThat !== ref) { | |
publish(fn(latestThis, latestThat)); | |
} | |
}, report, completeThis); | |
return function() { | |
cancelThat(); | |
cancelThis(); | |
}; | |
}); | |
} | |
}; | |
Signal.of = function(event) { | |
return new Signal(function(publish, report, complete) { | |
publish(event); | |
complete(); | |
}); | |
}; | |
Signal.fromEventEmitter = function(emitter, eventName) { | |
var subscribe = emitter.on || | |
emitter.addListener || | |
emitter.addEventListener; | |
var unsubscribe = emitter.off || | |
emitter.removeListener || | |
emitter.removeEventListener; | |
return new Signal(function(publish, report, complete) { | |
subscribe.call(emitter, eventName, publish); | |
return function() { | |
unsubscribe.call(emitter, eventName, publish); | |
}; | |
}); | |
}; | |
module.exports = Signal; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment