Skip to content

Instantly share code, notes, and snippets.

@L8D
Created January 21, 2015 15:47
Show Gist options
  • Save L8D/735888a29eec16499007 to your computer and use it in GitHub Desktop.
Save L8D/735888a29eec16499007 to your computer and use it in GitHub Desktop.
function Signal(sub) {
this.sub = sub;
}
Signal.prototype = {
/**
* Applies the given function over each event.
*/
map: function(fn) {
var sub = this.sub;
return new Signal(function(publish, report, complete) {
return sub(function(event) {
publish(fn(event));
}, report, complete);
});
},
/**
* Applies the given function to each event.
* For each event that succeeds, it is emitted.
*/
filter: function(fn) {
var sub = this.sub;
return new Signal(function(publish, report, complete) {
return sub(function(event) {
if (fn(event)) {
publish(event);
}
}, report, complete);
});
},
/**
* Merges two signals.
*/
merge: function(that) {
var subThat = that.sub;
var subThis = this.sub;
return new Signal(function(publish, report, complete) {
var thatCompleted = false;
var thisCompleted = false;
function completeThat() {
thatCompleted = true;
if (thisCompleted) {
complete();
}
}
function completeThis() {
thisCompleted = true;
if (thatCompleted) {
complete();
}
}
var cancelThat = subThat(publish, report, completeThat);
var cancelThis = subThis(publish, report, compelteThis);
return function() {
cancelThat();
cancelThis();
};
});
},
/**
* Parallelizes a signal of signals into
* one signal by merging each event.
*/
parallelize: function() {
var sub = this.sub;
return new Signal(function(publish, report, complete) {
var completed = false;
var uncompleted = 0;
return sub(function(event) {
// This code will fail horribly if
// the completion callback is called
// more than once. Good luck!
uncompleted++;
event.sub(publish, report, function() {
uncompleted--;
if (uncompleted === 0 && completed) {
complete();
}
});
}, report, function() {
// If there are no more uncompleted children,
// complete the signal, otherwise mark the
// parent as completed
if (uncompleted === 0) {
complete();
} else {
completed = true;
}
});
});
},
/**
* Applies the given function over each
* event and merges the resulting
* signals.
*
* `signal.parallel(fn)` is equivalent
* to `signal.map(fn).parallelize()`.
*/
parallel: function(fn) {
return this.map(fn).parallelize();
},
/**
* Concatenates two signals.
*/
concat: function(that) {
var subThat = that.sub;
var subThis = this.sub;
return new Signal(function(publish, report, complete) {
var cancel = subThis(publish, report, function() {
cancel = subThat(publish, report, complete);
});
return function() {
cancel();
};
});
},
/**
* Flattens a signal of signals into one
* single by concatenating each event.
*/
flatten: function() {
var sub = this.sub;
return new Signal(function(publish, report, complete) {
var completed = false;
var queue = [];
return sub(function(event) {
if (queue.length === 0) {
event.sub(publish, report, function next() {
// If there are children left in the queue,
// subscribe to the next one with the same
// complete callback, otherwise if the parent
// is completed and there are no children
// waiting, complete the signal.
if (queue.length !== 0) {
queue.shift().sub(publish, report, next);
} else if (completed) {
complete();
}
});
} else {
queue.push(event);
}
}, report, function() {
// If there are still children in the queue,
// mark the parent as completed
if (queue.length === 0) {
complete();
} else {
completed = true;
}
});
});
},
/**
* Applies the given function over each
* event and concatenates the resulting
* signals.
*
* `signal.chain(fn)` is equivalent
* to `signal.map(fn).flatten()`.
*/
chain: function(fn) {
return this.map(fn).flatten();
},
/**
* Like `parallelize` except cancels
* each signal before subscribing to
* the next.
*/
focus: function() {
var sub = this.sub;
return new Signal(function(publish, report, complete) {
var cancel = function() {};
return sub(function(event) {
cancel();
cancel = event.sub(publish, report, function() {});
}, report, function() {
cancel();
complete();
});
});
},
/**
* Like `parallel` except uses `focus`
* for signal joining.
*/
expand: function(fn) {
return this.map(fn).focus();
},
/**
* Zips events from two signals into one.
* Uses queue system to keep events in
* order.
*/
zip: function(that, fn) {
var subThat = that.sub;
var subThis = this.sub;
return new Signal(function(publish, report, complete) {
var thatCompleted = false;
var thisCompleted = false;
function completeThat() {
thatCompleted = true;
if (thisCompleted) {
complete();
}
}
function completeThis() {
thisCompleted = true;
if (thatCompleted) {
complete();
}
}
var thatQueue = [];
var thisQueue = [];
var cancelThat = subThat(function(event) {
if (thisQueue.length === 0) {
thatQueue.push(event);
} else {
publish(fn(thisQueue.shift(), event));
}
}, report, completeThat);
var cancelThis = subThat(function(event) {
if (thatQueue.length === 0) {
thisQueue.push(event);
} else {
publish(fn(event, thatQueue.shift()));
}
}, report, completeThis);
return function() {
cancelThat();
cancelThis();
};
});
},
/**
* Like `zip` except only applies latest
* known events from each signal.
*/
combine: function(that, fn) {
var subThat = that.sub;
var subThis = this.sub;
return new Signal(function(publish, report, complete) {
var thatCompleted = false;
var thisCompleted = false;
function completeThat() {
thatCompleted = true;
if (thisCompleted) {
complete();
}
}
function completeThis() {
thisCompleted = true;
if (thatCompleted) {
complete();
}
}
var ref = {};
var latestThat = ref;
var latestThis = ref;
var cancelThat = subThat(function(event) {
latestThat = event;
if (latestThis !== ref) {
publish(fn(latestThis, latestThat));
}
}, report, completeThat);
var cancelThis = subThat(function(event) {
latestThis = event;
if (lastThat !== ref) {
publish(fn(latestThis, latestThat));
}
}, report, completeThis);
return function() {
cancelThat();
cancelThis();
};
});
}
};
Signal.of = function(event) {
return new Signal(function(publish, report, complete) {
publish(event);
complete();
});
};
Signal.fromEventEmitter = function(emitter, eventName) {
var subscribe = emitter.on ||
emitter.addListener ||
emitter.addEventListener;
var unsubscribe = emitter.off ||
emitter.removeListener ||
emitter.removeEventListener;
return new Signal(function(publish, report, complete) {
subscribe.call(emitter, eventName, publish);
return function() {
unsubscribe.call(emitter, eventName, publish);
};
});
};
module.exports = Signal;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment