Last active July 31, 2017
Creates a most.js UMD module with creed's ultra-fast Promise implementation packaged in as the Promise shim.


Creates a most.js UMD module with creed's ultra-fast Promise implementation packaged in as the Promise shim.

Install it

The following command installs rollup and some plugins.

npm install

Build it

The following command runs rollup, which creates most-with-creed.js:

npm run build

Customize it

You can create an AMD or CommonJS variant by modifying the options in the rollup.config.js file.

import { Promise as CreedPromise } from 'creed'
export * from 'most'
(function (global, factory) {
typeof exports === 'object' && typeof module !== 'undefined' ? factory(exports) :
typeof define === 'function' && define.amd ? define(['exports'], factory) :
(factory((global.most = global.most || {})));
}(this, (function (exports) { 'use strict';
var Promise = function () {}; // create a phony local promise impl
/* eslint no-multi-spaces: 0 */
var PENDING = 1 << 0;
var FULFILLED = 1 << 1;
var REJECTED = 1 << 2;
var NEVER = 1 << 3;
var HANDLED = 1 << 4;
function isFulfilled (p) {
return (p.state() & FULFILLED) > 0
function isRejected (p) {
return (p.state() & REJECTED) > 0
function isSettled (p) {
return (p.state() & SETTLED) > 0
function isNever (p) {
return (p.state() & NEVER) > 0
function isHandled (p) {
return (p.state() & HANDLED) > 0
function silenceError (p) {
// implements Action
var silencer = {
fulfilled: function fulfilled () {},
rejected: function rejected (p) {
p._state |= HANDLED;
/* global process,MutationObserver,WebKitMutationObserver */
var isNode = typeof process !== 'undefined' && === '[object process]';
/* istanbul ignore next */
var MutationObs = (typeof MutationObserver === 'function' && MutationObserver) ||
(typeof WebKitMutationObserver === 'function' && WebKitMutationObserver);
var getenv = function (name) { return isNode && process.env[name]; };
var isDebug = getenv('CREED_DEBUG') ||
getenv('NODE_ENV') === 'development' ||
getenv('NODE_ENV') === 'test';
/* global process,document */
var makeAsync = function (f) {
return isNode ? createNodeScheduler(f) /* istanbul ignore next */
: MutationObs ? createBrowserScheduler(f)
: createFallbackScheduler(f)
/* istanbul ignore next */
function createFallbackScheduler (f) {
return function () { return setTimeout(f, 0); }
function createNodeScheduler (f) {
return function () { return process.nextTick(f); }
/* istanbul ignore next */
function createBrowserScheduler (f) {
var node = document.createTextNode('');
new MutationObs(f).observe(node, { characterData: true });
var i = 0;
return function () { = (i ^= 1); }
var TaskQueue = function TaskQueue () {
var this$1 = this;
this.tasks = new Array(2 << 15);
this.length = 0;
this.drain = makeAsync(function () { return this$1._drain(); });
TaskQueue.prototype.add = function add (task) {
if (this.length === 0) {
this.tasks[this.length++] = task;
TaskQueue.prototype._drain = function _drain () {
var q = this.tasks;
for (var i = 0; i < this.length; ++i) {
q[i] = void 0;
this.length = 0;
var noop = function () {};
// WARNING: shared mutable notion of "current context"
var _currentContext;
var _createContext = noop;
// Get the current context
var peekContext = function () { return _currentContext; };
// Append a new context to the current, and set the current context
// to the newly appended one
var pushContext = function (at, tag) { return _createContext(_currentContext, at, tag); };
// Set the current context to the provided one, returning the
// previously current context (which makes it easy to swap back
// to it)
var swapContext = function (context) {
var previousContext = _currentContext;
_currentContext = context;
return previousContext
// Enable context tracing. Must provide:
// createContext :: c -> Function -> String -> c
// Given the current context, and a function and string tag representing a new context,
// return a new current context
// initialContext :: c
// An initial current context
var traceAsync = function (createContext, initialContext) {
_createContext = createContext;
_currentContext = initialContext;
// Enable default context tracing
var enableAsyncTraces = function () { return traceAsync(createContext, undefined); };
// ------------------------------------------------------
// Default context tracing
var createContext = function (currentContext, at, tag) { return new Context(currentContext, tag ||, at); };
var captureStackTrace = Error.captureStackTrace || noop;
var Context = function Context (next, tag, at) { = next;
this.tag = tag;
captureStackTrace(this, at);
Context.prototype.toString = function toString () {
return this.tag ? (" from " + (this.tag) + ":") : ' from previous context:'
// ------------------------------------------------------
// Default context formatting
// If context provided, attach an async trace for it.
// Otherwise, do nothing.
var attachTrace = function (e, context) { return context != null ? formatTrace(e, context) : e; };
// If e is an Error, attach an async trace to e for the provided context
// Otherwise, do nothing
function formatTrace (e, context) {
if (e instanceof Error && !('_creed$OriginalStack' in e)) {
e._creed$OriginalStack = e.stack;
e.stack = formatContext(elideTrace(e.stack), context);
return e
// Fold context list into a newline-separated, combined async trace
function formatContext (trace, context) {
if (context == null) {
return trace
var s = elideTrace(context.stack);
return formatContext(s.indexOf(' at ') < 0 ? trace : (trace + '\n' + s),
var elideTraceRx =
// Remove internal stack frames
var elideTrace = function (stack) { return typeof stack === 'string' ? stack.replace(elideTraceRx, '') : ''; };
var UNHANDLED_REJECTION = 'unhandledRejection';
var HANDLED_REJECTION = 'rejectionHandled';
var ErrorHandler = function ErrorHandler (emitEvent, reportError) {
this.rejections = [];
this.emit = emitEvent;
this.reportError = reportError;
ErrorHandler.prototype.track = function track (rejected) {
var e = attachTrace(rejected.value, rejected.context);
if (!this.emit(UNHANDLED_REJECTION, rejected, e)) {
/* istanbul ignore else */
if (this.rejections.length === 0) {
setTimeout(reportErrors, 1, this.reportError, this.rejections);
ErrorHandler.prototype.untrack = function untrack (rejected) {
this.emit(HANDLED_REJECTION, rejected);
function reportErrors (report, rejections) {
try {
reportAll(rejections, report);
} finally {
rejections.length = 0;
function reportAll (rejections, report) {
for (var i = 0; i < rejections.length; ++i) {
var rejected = rejections[i];
/* istanbul ignore else */
if (!isHandled(rejected)) {
var UNHANDLED_REJECTION$1 = 'unhandledRejection';
var makeEmitError = function () {
/* global process, self, CustomEvent */
// istanbul ignore else */
if (isNode && typeof process.emit === 'function') {
// Returning falsy here means to call the default reportRejection API.
// This is safe even in browserify since process.emit always returns
// falsy in browserify:
return function (type, error) {
return type === UNHANDLED_REJECTION$1
? process.emit(type, error.value, error)
: process.emit(type, error)
} else if (typeof self !== 'undefined' && typeof CustomEvent === 'function') {
return (function (noop, self, CustomEvent) {
var hasCustomEvent;
try {
hasCustomEvent = new CustomEvent(UNHANDLED_REJECTION$1) instanceof CustomEvent;
} catch (e) {
hasCustomEvent = false;
return !hasCustomEvent ? noop : function (type, error) {
var ev = new CustomEvent(type, {
detail: {
reason: error.value,
promise: error
bubbles: false,
cancelable: true
return !self.dispatchEvent(ev)
}(noop$1, self, CustomEvent))
// istanbul ignore next */
return noop$1
// istanbul ignore next */
function noop$1 () {}
// maybeThenable :: * -> boolean
function maybeThenable (x) {
return (typeof x === 'object' || typeof x === 'function') && x !== null
var Action = function Action (promise) {
this.promise = promise;
this.context = pushContext(this.constructor);
// default onFulfilled action
/* istanbul ignore next */
Action.prototype.fulfilled = function fulfilled (p) {
// default onRejected action
Action.prototype.rejected = function rejected (p) {
return false
function tryCall (f, x, handle, promise) {
var result;
// test if `f` (and only it) throws
try {
result = f(x);
} catch (e) {
} // else
handle(promise, result);
function then (f, r, p, promise) {
p._when(new Then(f, r, promise));
return promise
var Then = (function (Action$$1) {
function Then (f, r, promise) {
Action$$, promise);
this.f = f;
this.r = r;
if ( Action$$1 ) Then.__proto__ = Action$$1;
Then.prototype = Object.create( Action$$1 && Action$$1.prototype );
Then.prototype.constructor = Then;
Then.prototype.fulfilled = function fulfilled (p) {
this.runThen(this.f, p);
Then.prototype.rejected = function rejected (p) {
return this.runThen(this.r, p)
Then.prototype.runThen = function runThen (f, p) {
if (typeof f !== 'function') {
return false
tryCall(f, p.value, handleThen, this.promise);
return true
return Then;
function handleThen (promise, result) {
var map = function (f, p, promise) {
p._when(new Map(f, promise));
return promise
var Map = (function (Action$$1) {
function Map (f, promise) {
Action$$, promise);
this.f = f;
if ( Action$$1 ) Map.__proto__ = Action$$1;
Map.prototype = Object.create( Action$$1 && Action$$1.prototype );
Map.prototype.constructor = Map;
Map.prototype.fulfilled = function fulfilled (p) {
tryCall(this.f, p.value, handleMap, this.promise);
return Map;
function handleMap (promise, result) {
var bimap = function (r, f, p, promise) {
p._when(new Bimap(r, f, promise));
return promise
var Bimap = (function (Map$$1) {
function Bimap (r, f, promise) {
Map$$, f, promise);
this.r = r;
if ( Map$$1 ) Bimap.__proto__ = Map$$1;
Bimap.prototype = Object.create( Map$$1 && Map$$1.prototype );
Bimap.prototype.constructor = Bimap;
Bimap.prototype.rejected = function rejected (p) {
tryCall(this.r, p.value, handleMapRejected, this.promise);
return Bimap;
function handleMapRejected (promise, result) {
var chain = function (f, p, promise) {
p._when(new Chain(f, promise));
return promise
var Chain = (function (Action$$1) {
function Chain (f, promise) {
Action$$, promise);
this.f = f;
if ( Action$$1 ) Chain.__proto__ = Action$$1;
Chain.prototype = Object.create( Action$$1 && Action$$1.prototype );
Chain.prototype.constructor = Chain;
Chain.prototype.fulfilled = function fulfilled (p) {
tryCall(this.f, p.value, handleChain, this.promise);
return Chain;
function handleChain (promise, result) {
if (!(maybeThenable(result) && typeof result.then === 'function')) {
promise._reject(new TypeError('f must return a promise'));
var Race = function Race (never) {
this.never = never;
Race.prototype.valueAt = function valueAt (x, i, promise) {
Race.prototype.fulfillAt = function fulfillAt (p, i, promise) {
Race.prototype.rejectAt = function rejectAt (p, i, promise) {
// In the case where the result promise has been resolved
// need to silence all subsequently seen rejections
promise._isResolved() ? silenceError(p) : promise._become(p);
Race.prototype.complete = function complete (total, promise) {
if (total === 0) {
var Merge = function Merge (mergeHandler, results) {
this.pending = 0;
this.results = results;
this.mergeHandler = mergeHandler;
Merge.prototype.valueAt = function valueAt (x, i, promise) {
this.results[i] = x;
this.check(this.pending - 1, promise);
Merge.prototype.fulfillAt = function fulfillAt (p, i, promise) {
this.valueAt(p.value, i, promise);
Merge.prototype.rejectAt = function rejectAt (p, i, promise) {
// In the case where the result promise has been resolved
// need to silence all subsequently seen rejections
promise._isResolved() ? silenceError(p) : promise._become(p);
Merge.prototype.complete = function complete (total, promise) {
this.check(this.pending + total, promise);
Merge.prototype.check = function check (pending, promise) {
this.pending = pending;
if (pending === 0) {
this.mergeHandler.merge(promise, this.results);
function resultsArray (iterable) {
return Array.isArray(iterable) ? new Array(iterable.length) : []
function resolveIterable (resolve, handler, promises, promise) {
var run = Array.isArray(promises) ? runArray : runIterable;
try {
run(resolve, handler, promises, promise);
} catch (e) {
return promise.near()
function runArray (resolve, handler, promises, promise) {
var i = 0;
for (; i < promises.length; ++i) {
handleItem(resolve, handler, promises[i], i, promise);
handler.complete(i, promise);
function runIterable (resolve, handler, promises, promise) {
var i = 0;
var iter = promises[Symbol.iterator]();
while (true) {
var step =;
if (step.done) {
handleItem(resolve, handler, step.value, i++, promise);
handler.complete(i, promise);
function handleItem (resolve, handler, x, i, promise) {
/* eslint complexity:[1,6] */
if (!maybeThenable(x)) {
handler.valueAt(x, i, promise);
var p = resolve(x);
if (promise._isResolved()) {
if (!isFulfilled(p)) {
} else if (isFulfilled(p)) {
handler.fulfillAt(p, i, promise);
} else if (isRejected(p)) {
handler.rejectAt(p, i, promise);
} else {
p._runAction(new AtIndex(handler, i, promise));
var AtIndex = (function (Action$$1) {
function AtIndex (handler, i, promise) {
Action$$, promise);
this.i = i;
this.handler = handler;
if ( Action$$1 ) AtIndex.__proto__ = Action$$1;
AtIndex.prototype = Object.create( Action$$1 && Action$$1.prototype );
AtIndex.prototype.constructor = AtIndex;
AtIndex.prototype.fulfilled = function fulfilled (p) {
this.handler.fulfillAt(p, this.i, this.promise);
AtIndex.prototype.rejected = function rejected (p) {
return this.handler.rejectAt(p, this.i, this.promise)
return AtIndex;
function createCommonjsModule(fn, module) {
return module = { exports: {} }, fn(module, module.exports), module.exports;
var index = createCommonjsModule(function (module) {
(function() {
'use strict';
/* eslint comma-dangle: ["off"], no-var: ["off"], strict: ["error", "function"] */
/* global self */
var mapping = {
equals: 'fantasy-land/equals',
concat: 'fantasy-land/concat',
empty: 'fantasy-land/empty',
map: 'fantasy-land/map',
ap: 'fantasy-land/ap',
of: 'fantasy-land/of',
alt: 'fantasy-land/alt',
zero: 'fantasy-land/zero',
reduce: 'fantasy-land/reduce',
traverse: 'fantasy-land/traverse',
chain: 'fantasy-land/chain',
chainRec: 'fantasy-land/chainRec',
extend: 'fantasy-land/extend',
extract: 'fantasy-land/extract',
bimap: 'fantasy-land/bimap',
promap: 'fantasy-land/promap'
module.exports = mapping;
var taskQueue = new TaskQueue();
/* istanbul ignore next */
var handleError = function (ref) {
var value = ref.value;
throw value };
/* istanbul ignore next */
var errorHandler = new ErrorHandler(makeEmitError(), handleError);
// -------------------------------------------------------------
// ## Types
// -------------------------------------------------------------
// Internal base type, provides fantasy-land namespace
// and type representative
var Core = function Core () {
this.context = peekContext();
// empty :: Promise e a
Core.empty = function empty () {
return never()
// of :: a -> Promise e a
Core.of = function of (x) {
return fulfill(x)
Core[index.empty] = function () {
return never()
Core[index.of] = function (x) {
return fulfill(x)
Core.prototype[] = function (f) {
Core.prototype[index.bimap] = function (r, f) {
return this.bimap(r, f)
Core.prototype[index.ap] = function (pf) {
return pf.ap(this)
Core.prototype[index.chain] = function (f) {
return this.chain(f)
Core.prototype[index.concat] = function (p) {
return this.concat(p)
Core.prototype[index.alt] = function (p) {
return this.or(p)
Core[] = function () {
return never()
// @deprecated The name concat is deprecated, use or() instead.
Core.prototype.concat = function concat (b) {
return this.or(b)
// data Promise e a where
// Future :: Promise e a
// Fulfilled :: a -> Promise e a
// Rejected :: Error e => e -> Promise e a
// Never :: Promise e a
// Future :: Promise e a
// A promise whose value cannot be known until some future time
var Future = (function (Core) {
function Future () {;
this.ref = void 0;
this.action = void 0;
this.length = 0;
if ( Core ) Future.__proto__ = Core;
Future.prototype = Object.create( Core && Core.prototype );
Future.prototype.constructor = Future;
// then :: Promise e a -> (a -> b) -> Promise e b
// then :: Promise e a -> () -> (e -> b) -> Promise e b
// then :: Promise e a -> (a -> b) -> (e -> b) -> Promise e b
Future.prototype.then = function then$1 (f, r) {
var n = this.near();
return n === this ? then(f, r, this, new Future()) : n.then(f, r)
// catch :: Promise e a -> (e -> b) -> Promise e b
Future.prototype.catch = function catch$1 (r) {
var n = this.near();
return n === this ? then(void 0, r, this, new Future()) : n.catch(r)
// map :: Promise e a -> (a -> b) -> Promise e b = function map$1 (f) {
var n = this.near();
return n === this ? map(f, this, new Future()) :
Future.prototype.bimap = function bimap$1 (r, f) {
var n = this.near();
return n === this
? bimap(r, f, this, new Future())
: n.bimap(r, f)
// ap :: Promise e (a -> b) -> Promise e a -> Promise e b
Future.prototype.ap = function ap (p) {
var n = this.near();
var pn = p.near();
return n === this ? this.chain(function (f) { return; }) : n.ap(pn)
// chain :: Promise e a -> (a -> Promise e b) -> Promise e b
Future.prototype.chain = function chain$1 (f) {
var n = this.near();
return n === this ? chain(f, this, new Future()) : n.chain(f)
// or :: Promise e a -> Promise e a -> Promise e a
Future.prototype.or = function or (b) {
/* eslint complexity:[2,5] */
var n = this.near();
var bn = b.near();
return isSettled(n) || isNever(bn) ? n
: isSettled(bn) || isNever(n) ? bn
: race([n, bn])
// toString :: Promise e a -> String
Future.prototype.toString = function toString () {
return '[object ' + this.inspect() + ']'
// inspect :: Promise e a -> String
Future.prototype.inspect = function inspect () {
var n = this.near();
return n === this ? 'Promise { pending }' : n.inspect()
// near :: Promise e a -> Promise e a
Future.prototype.near = function near () {
if (!this._isResolved()) {
return this
this.ref = this.ref.near();
return this.ref
// state :: Promise e a -> Int
Future.prototype.state = function state () {
return this._isResolved() ? this.ref.near().state() : PENDING
Future.prototype._isResolved = function _isResolved () {
return this.ref !== void 0
Future.prototype._when = function _when (action) {
Future.prototype._runAction = function _runAction (action) {
if (this.action === void 0) {
this.action = action;
} else {
this[this.length++] = action;
Future.prototype._resolve = function _resolve (x) {
Future.prototype._fulfill = function _fulfill (x) {
this._become(new Fulfilled(x));
Future.prototype._reject = function _reject (e) {
if (this._isResolved()) {
this.__become(new Rejected(e));
Future.prototype._become = function _become (p) {
if (this._isResolved()) {
Future.prototype.__become = function __become (p) {
this.ref = p === this ? cycle() : p;
if (this.action === void 0) {
}; = function run () {
var this$1 = this;
var p = this.ref.near();
this.action = void 0;
for (var i = 0; i < this.length; ++i) {
this$1[i] = void 0;
return Future;
// Fulfilled :: a -> Promise e a
// A promise whose value is already known
var Fulfilled = (function (Core) {
function Fulfilled (x) {;
this.value = x;
if ( Core ) Fulfilled.__proto__ = Core;
Fulfilled.prototype = Object.create( Core && Core.prototype );
Fulfilled.prototype.constructor = Fulfilled;
Fulfilled.prototype.then = function then$2 (f) {
return typeof f === 'function' ? then(f, void 0, this, new Future()) : this
Fulfilled.prototype.catch = function catch$2 () {
return this
}; = function map$2 (f) {
return map(f, this, new Future())
Fulfilled.prototype.bimap = function bimap$$1 (_, f) {
Fulfilled.prototype.ap = function ap (p) {
Fulfilled.prototype.chain = function chain$2 (f) {
return chain(f, this, new Future())
Fulfilled.prototype.or = function or () {
return this
Fulfilled.prototype.toString = function toString () {
return '[object ' + this.inspect() + ']'
Fulfilled.prototype.inspect = function inspect () {
return 'Promise { fulfilled: ' + this.value + ' }'
Fulfilled.prototype.state = function state () {
Fulfilled.prototype.near = function near () {
return this
Fulfilled.prototype._when = function _when (action) {
taskQueue.add(new Continuation(action, this));
Fulfilled.prototype._runAction = function _runAction (action) {
var c = swapContext(action.context);
return Fulfilled;
// Rejected :: Error e => e -> Promise e a
// A promise whose value cannot be known due to some reason/error
var Rejected = (function (Core) {
function Rejected (e) {;
this.value = e;
this._state = REJECTED;
if ( Core ) Rejected.__proto__ = Core;
Rejected.prototype = Object.create( Core && Core.prototype );
Rejected.prototype.constructor = Rejected;
Rejected.prototype.then = function then$$1 (_, r) {
return typeof r === 'function' ? this.catch(r) : this
Rejected.prototype.catch = function catch$3 (r) {
return then(void 0, r, this, new Future())
}; = function map$$1 () {
return this
Rejected.prototype.bimap = function bimap$2 (r) {
return bimap(r, void 0, this, new Future())
Rejected.prototype.ap = function ap () {
return this
Rejected.prototype.chain = function chain$$1 () {
return this
Rejected.prototype.or = function or () {
return this
Rejected.prototype.toString = function toString () {
return '[object ' + this.inspect() + ']'
Rejected.prototype.inspect = function inspect () {
return 'Promise { rejected: ' + this.value + ' }'
Rejected.prototype.state = function state () {
return this._state
Rejected.prototype.near = function near () {
return this
Rejected.prototype._when = function _when (action) {
taskQueue.add(new Continuation(action, this));
Rejected.prototype._runAction = function _runAction (action) {
var c = swapContext(action.context);
if (action.rejected(this)) {
return Rejected;
// Never :: Promise e a
// A promise that waits forever for its value to be known
var Never = (function (Core) {
function Never () {
Core.apply(this, arguments);
if ( Core ) Never.__proto__ = Core;
Never.prototype = Object.create( Core && Core.prototype );
Never.prototype.constructor = Never;
Never.prototype.then = function then$$1 () {
return this
Never.prototype.catch = function catch$4 () {
return this
}; = function map$$1 () {
return this
Never.prototype.bimap = function bimap$$1 () {
return this
Never.prototype.ap = function ap () {
return this
Never.prototype.chain = function chain$$1 () {
return this
Never.prototype.or = function or (b) {
return b
Never.prototype.toString = function toString () {
return '[object ' + this.inspect() + ']'
Never.prototype.inspect = function inspect () {
return 'Promise { never }'
Never.prototype.state = function state () {
Never.prototype.near = function near () {
return this
Never.prototype._when = function _when () {
Never.prototype._runAction = function _runAction () {
return Never;
// -------------------------------------------------------------
// ## Creating promises
// -------------------------------------------------------------
// resolve :: Thenable e a -> Promise e a
// resolve :: a -> Promise e a
function resolve (x) {
return isPromise(x) ? x.near()
: maybeThenable(x) ? refForMaybeThenable(fulfill, x)
: new Fulfilled(x)
// reject :: e -> Promise e a
function reject (e) {
return new Rejected(e)
// never :: Promise e a
function never () {
return new Never()
// fulfill :: a -> Promise e a
function fulfill (x) {
return new Fulfilled(x)
// -------------------------------------------------------------
// ## Iterables
// -------------------------------------------------------------
// all :: Iterable (Promise e a) -> Promise e [a]
function all (promises) {
var handler = new Merge(allHandler, resultsArray(promises));
return iterablePromise(handler, promises)
var allHandler = {
merge: function merge (promise, args) {
// race :: Iterable (Promise e a) -> Promise e a
function race (promises) {
return iterablePromise(new Race(never), promises)
function isIterable (x) {
return typeof x === 'object' && x !== null
function iterablePromise (handler, iterable) {
if (!isIterable(iterable)) {
return reject(new TypeError('expected an iterable'))
var p = new Future();
return resolveIterable(resolveMaybeThenable, handler, iterable, p)
// -------------------------------------------------------------
// # Internals
// -------------------------------------------------------------
// isPromise :: * -> boolean
function isPromise (x) {
return x instanceof Core
function resolveMaybeThenable (x) {
return isPromise(x) ? x.near() : refForMaybeThenable(fulfill, x)
function refForMaybeThenable (otherwise, x) {
try {
var then$$1 = x.then;
return typeof then$$1 === 'function'
? extractThenable(then$$1, x)
: otherwise(x)
} catch (e) {
return new Rejected(e)
// WARNING: Naming the first arg "then" triggers babel compilation bug
function extractThenable (thn, thenable) {
var p = new Future();
try {, function (x) { return p._resolve(x); }, function (e) { return p._reject(e); });
} catch (e) {
return p.near()
function cycle () {
return new Rejected(new TypeError('resolution cycle'))
var Continuation = function Continuation (action, promise) {
this.action = action;
this.promise = promise;
}; = function run () {
var Any = function Any () {
this.pending = 0;
Any.prototype.valueAt = function valueAt (x, i, promise) {
Any.prototype.fulfillAt = function fulfillAt (p, i, promise) {
Any.prototype.rejectAt = function rejectAt (p, i, promise) {
this.check(this.pending - 1, promise);
Any.prototype.complete = function complete (total, promise) {
this.check(this.pending + total, promise);
Any.prototype.check = function check (pending, promise) {
this.pending = pending;
if (pending === 0) {
promise._reject(new RangeError('No fulfilled promises in input'));
var Settle = function Settle (resolve, results) {
this.pending = 0;
this.results = results;
this.resolve = resolve;
Settle.prototype.valueAt = function valueAt (x, i, promise) {
this.settleAt(this.resolve(x), i, promise);
Settle.prototype.fulfillAt = function fulfillAt (p, i, promise) {
this.settleAt(p, i, promise);
Settle.prototype.rejectAt = function rejectAt (p, i, promise) {
this.settleAt(p, i, promise);
Settle.prototype.settleAt = function settleAt (p, i, promise) {
this.results[i] = p;
this.check(this.pending - 1, promise);
Settle.prototype.complete = function complete (total, promise) {
this.check(this.pending + total, promise);
Settle.prototype.check = function check (pending, promise) {
this.pending = pending;
if (pending === 0) {
function runPromise$1 (f, thisArg, args, promise) {
/* eslint complexity:[2,5] */
function resolve (x) {
var c = swapContext(promise.context);
function reject (e) {
var c = swapContext(promise.context);
switch (args.length) {
case 0:, resolve, reject);
case 1:, args[0], resolve, reject);
case 2:, args[0], args[1], resolve, reject);
case 3:, args[0], args[1], args[2], resolve, reject);
args.push(resolve, reject);
f.apply(thisArg, args);
return promise
function runNode$1 (f, thisArg, args, promise) {
/* eslint complexity:[2,5] */
function settleNode (e, x) {
var c = swapContext(promise.context);
if (e) {
} else {
switch (args.length) {
case 0:, settleNode);
case 1:, args[0], settleNode);
case 2:, args[0], args[1], settleNode);
case 3:, args[0], args[1], args[2], settleNode);
f.apply(thisArg, args);
return promise
var _runCoroutine = function (resolve, iterator, promise) {
new Coroutine(resolve, iterator, promise).run();
return promise
var Coroutine = (function (Action$$1) {
function Coroutine (resolve, iterator, promise) {
Action$$, promise);
this.resolve = resolve;
this.generator = iterator;
if ( Action$$1 ) Coroutine.__proto__ = Action$$1;
Coroutine.prototype = Object.create( Action$$1 && Action$$1.prototype );
Coroutine.prototype.constructor = Coroutine; = function run () {
this.tryStep(, void 0);
Coroutine.prototype.tryStep = function tryStep (resume, x) {
var context = swapContext(this.context);
var result;
// test if `resume` (and only it) throws
try {
result =, x);
} catch (e) {
} finally {
}// else
Coroutine.prototype.handleResult = function handleResult (result) {
if (result.done) {
return this.promise._resolve(result.value)
Coroutine.prototype.handleReject = function handleReject (e) {
Coroutine.prototype.fulfilled = function fulfilled (p) {
this.tryStep(, p.value);
Coroutine.prototype.rejected = function rejected (p) {
this.tryStep(this.generator.throw, p.value);
return true
return Coroutine;
/* istanbul ignore next */
if (isDebug) {
function runGenerator (generator, thisArg, args) {
var iterator = generator.apply(thisArg, args);
return _runCoroutine(resolve, iterator, new Future())
function runResolver (run, f, thisArg, args, p) {
try {
run(f, thisArg, args, p);
} catch (e) {
return p
function runMerge (f, thisArg, args) {
var handler = new Merge(new MergeHandler(f, thisArg), resultsArray(args));
return iterablePromise(handler, args)
var MergeHandler = function MergeHandler (f, c) {
this.context = pushContext(this.constructor,;
this.f = f;
this.c = c;
this.promise = void 0;
this.args = void 0;
MergeHandler.prototype.merge = function merge (promise, args) {
this.promise = promise;
this.args = args;
}; = function run () {
var c = swapContext(this.context);
try {
this.promise._resolve(this.f.apply(this.c, this.args));
} catch (e) {
function checkFunction (f) {
if (typeof f !== 'function') {
throw new TypeError('must provide a resolver function')
// -------------------------------------------------------------
// ## ES6 Promise polyfill
// -------------------------------------------------------------
var NOARGS = [];
// type Resolve a = a -> ()
// type Reject e = e -> ()
// Promise :: (Resolve a -> Reject e) -> Promise e a
var CreedPromise = (function (Future$$1) {
function CreedPromise (f) {
runResolver(runPromise$1, f, void 0, NOARGS, this);
if ( Future$$1 ) CreedPromise.__proto__ = Future$$1;
CreedPromise.prototype = Object.create( Future$$1 && Future$$1.prototype );
CreedPromise.prototype.constructor = CreedPromise;
return CreedPromise;
CreedPromise.resolve = resolve;
CreedPromise.reject = reject;
CreedPromise.all = all;
CreedPromise.race = race;
function shim () {
/* global self */
var orig = typeof Promise === 'function' && Promise;
/* istanbul ignore if */
if (typeof self !== 'undefined') {
self.Promise = CreedPromise;
/* istanbul ignore else */
} else if (typeof global !== 'undefined') {
global.Promise = CreedPromise;
return orig
/* istanbul ignore if */
if (typeof Promise !== 'function') {
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function Stream (source) {
this.source = source;
/** @license MIT License (c) copyright 2010-2016 original author or authors */
// Non-mutating array operations
// cons :: a -> [a] -> [a]
// a with x prepended
function cons (x, a) {
var l = a.length;
var b = new Array(l + 1);
b[0] = x;
for (var i = 0; i < l; ++i) {
b[i + 1] = a[i];
return b
// append :: a -> [a] -> [a]
// a with x appended
function append (x, a) {
var l = a.length;
var b = new Array(l + 1);
for (var i = 0; i < l; ++i) {
b[i] = a[i];
b[l] = x;
return b
// drop :: Int -> [a] -> [a]
// drop first n elements
function drop (n, a) { // eslint-disable-line complexity
if (n < 0) {
throw new TypeError('n must be >= 0')
var l = a.length;
if (n === 0 || l === 0) {
return a
if (n >= l) {
return []
return unsafeDrop(n, a, l - n)
// unsafeDrop :: Int -> [a] -> Int -> [a]
// Internal helper for drop
function unsafeDrop (n, a, l) {
var b = new Array(l);
for (var i = 0; i < l; ++i) {
b[i] = a[n + i];
return b
// tail :: [a] -> [a]
// drop head element
function tail (a) {
return drop(1, a)
// copy :: [a] -> [a]
// duplicate a (shallow duplication)
function copy (a) {
var l = a.length;
var b = new Array(l);
for (var i = 0; i < l; ++i) {
b[i] = a[i];
return b
// map :: (a -> b) -> [a] -> [b]
// transform each element with f
function map$1 (f, a) {
var l = a.length;
var b = new Array(l);
for (var i = 0; i < l; ++i) {
b[i] = f(a[i]);
return b
// reduce :: (a -> b -> a) -> a -> [b] -> a
// accumulate via left-fold
function reduce (f, z, a) {
var r = z;
for (var i = 0, l = a.length; i < l; ++i) {
r = f(r, a[i], i);
return r
// replace :: a -> Int -> [a]
// replace element at index
function replace (x, i, a) { // eslint-disable-line complexity
if (i < 0) {
throw new TypeError('i must be >= 0')
var l = a.length;
var b = new Array(l);
for (var j = 0; j < l; ++j) {
b[j] = i === j ? x : a[j];
return b
// remove :: Int -> [a] -> [a]
// remove element at index
function remove (i, a) { // eslint-disable-line complexity
if (i < 0) {
throw new TypeError('i must be >= 0')
var l = a.length;
if (l === 0 || i >= l) { // exit early if index beyond end of array
return a
if (l === 1) { // exit early if index in bounds and length === 1
return []
return unsafeRemove(i, a, l - 1)
// unsafeRemove :: Int -> [a] -> Int -> [a]
// Internal helper to remove element at index
function unsafeRemove (i, a, l) {
var b = new Array(l);
var j;
for (j = 0; j < i; ++j) {
b[j] = a[j];
for (j = i; j < l; ++j) {
b[j] = a[j + 1];
return b
// removeAll :: (a -> boolean) -> [a] -> [a]
// remove all elements matching a predicate
function removeAll (f, a) {
var l = a.length;
var b = new Array(l);
var j = 0;
for (var x, i = 0; i < l; ++i) {
x = a[i];
if (!f(x)) {
b[j] = x;
b.length = j;
return b
// findIndex :: a -> [a] -> Int
// find index of x in a, from the left
function findIndex (x, a) {
for (var i = 0, l = a.length; i < l; ++i) {
if (x === a[i]) {
return i
return -1
// isArrayLike :: * -> boolean
// Return true iff x is array-like
function isArrayLike (x) {
return x != null && typeof x.length === 'number' && typeof x !== 'function'
/** @license MIT License (c) copyright 2010-2016 original author or authors */
// id :: a -> a
var id = function (x) { return x; };
// compose :: (b -> c) -> (a -> b) -> (a -> c)
var compose = function (f, g) { return function (x) { return f(g(x)); }; };
// apply :: (a -> b) -> a -> b
var apply = function (f, x) { return f(x); };
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* Create a new Disposable which will dispose its underlying resource.
* @param {function} dispose function
* @param {*?} data any data to be passed to disposer function
* @constructor
function Disposable (dispose, data) {
this._dispose = dispose;
this._data = data;
Disposable.prototype.dispose = function () {
return this._dispose(this._data)
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function SettableDisposable () {
this.disposable = void 0;
this.disposed = false;
this._resolve = void 0;
var self = this;
this.result = new Promise(function (resolve) {
self._resolve = resolve;
SettableDisposable.prototype.setDisposable = function (disposable) {
if (this.disposable !== void 0) {
throw new Error('setDisposable called more than once')
this.disposable = disposable;
if (this.disposed) {
SettableDisposable.prototype.dispose = function () {
if (this.disposed) {
return this.result
this.disposed = true;
if (this.disposable !== void 0) {
this.result = this.disposable.dispose();
return this.result
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function isPromise$1 (p) {
return p !== null && typeof p === 'object' && typeof p.then === 'function'
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
var map$2 = map$1;
var identity = id;
* Call disposable.dispose. If it returns a promise, catch promise
* error and forward it through the provided sink.
* @param {number} t time
* @param {{dispose: function}} disposable
* @param {{error: function}} sink
* @return {*} result of disposable.dispose
function tryDispose (t, disposable, sink) {
var result = disposeSafely(disposable);
return isPromise$1(result)
? result.catch(function (e) {
sink.error(t, e);
: result
* Create a new Disposable which will dispose its underlying resource
* at most once.
* @param {function} dispose function
* @param {*?} data any data to be passed to disposer function
* @return {Disposable}
function create (dispose, data) {
return once(new Disposable(dispose, data))
* Create a noop disposable. Can be used to satisfy a Disposable
* requirement when no actual resource needs to be disposed.
* @return {Disposable|exports|module.exports}
function empty$1 () {
return new Disposable(identity, void 0)
* Create a disposable that will dispose all input disposables in parallel.
* @param {Array<Disposable>} disposables
* @return {Disposable}
function all$1 (disposables) {
return create(disposeAll, disposables)
function disposeAll (disposables) {
return Promise.all(map$2(disposeSafely, disposables))
function disposeSafely (disposable) {
try {
return disposable.dispose()
} catch (e) {
return Promise.reject(e)
* Create a disposable from a promise for another disposable
* @param {Promise<Disposable>} disposablePromise
* @return {Disposable}
* Create a disposable proxy that allows its underlying disposable to
* be set later.
* @return {SettableDisposable}
function settable () {
return new SettableDisposable()
* Wrap an existing disposable (which may not already have been once()d)
* so that it will only dispose its underlying resource at most once.
* @param {{ dispose: function() }} disposable
* @return {Disposable} wrapped disposable
function once (disposable) {
return new Disposable(disposeMemoized, memoized(disposable))
function disposeMemoized (memoized) {
if (!memoized.disposed) {
memoized.disposed = true;
memoized.value = disposeSafely(memoized.disposable);
memoized.disposable = void 0;
return memoized.value
function memoized (disposable) {
return { disposed: false, disposable: disposable, value: void 0 }
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function fatalError (e) {
setTimeout(function () {
throw e
}, 0);
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function PropagateTask (run, value, sink) {
this._run = run;
this.value = value;
this.sink = sink; = true;
PropagateTask.event = function (value, sink) {
return new PropagateTask(emit, value, sink)
PropagateTask.end = function (value, sink) {
return new PropagateTask(end, value, sink)
PropagateTask.error = function (value, sink) {
return new PropagateTask(error, value, sink)
PropagateTask.prototype.dispose = function () { = false;
}; = function (t) {
if (! {
this._run(t, this.value, this.sink);
PropagateTask.prototype.error = function (t, e) {
if (! {
return fatalError(e)
this.sink.error(t, e);
function error (t, e, sink) {
sink.error(t, e);
function emit (t, x, sink) {
sink.event(t, x);
function end (t, x, sink) {
sink.end(t, x);
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* Stream containing only x
* @param {*} x
* @returns {Stream}
function of (x) {
return new Stream(new Just(x))
function Just (x) {
this.value = x;
} = function (sink, scheduler) {
return scheduler.asap(new PropagateTask(runJust, this.value, sink))
function runJust (t, x, sink) {
sink.event(t, x);
sink.end(t, void 0);
* Stream containing no events and ends immediately
* @returns {Stream}
function empty$$1 () {
return EMPTY
function EmptySource () {} = function (sink, scheduler) {
var task = PropagateTask.end(void 0, sink);
return create(disposeEmpty, task)
function disposeEmpty (task) {
return task.dispose()
var EMPTY = new Stream(new EmptySource());
* Stream containing no events and never ends
* @returns {Stream}
function never$1 () {
return NEVER$1
function NeverSource () {} = function () {
return empty$1()
var NEVER$1 = new Stream(new NeverSource());
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function fromArray (a) {
return new Stream(new ArraySource(a))
function ArraySource (a) {
this.array = a;
} = function (sink, scheduler) {
return scheduler.asap(new PropagateTask(runProducer, this.array, sink))
function runProducer (t, array, sink) {
for (var i = 0, l = array.length; i < l &&; ++i) {
sink.event(t, array[i]);
} && end(t);
function end (t) {
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
/*global Set, Symbol*/
var iteratorSymbol;
// Firefox ships a partial implementation using the name @@iterator.
if (typeof Set === 'function' && typeof new Set()['@@iterator'] === 'function') {
iteratorSymbol = '@@iterator';
} else {
iteratorSymbol = typeof Symbol === 'function' && Symbol.iterator ||
function isIterable$1 (o) {
return typeof o[iteratorSymbol] === 'function'
function getIterator (o) {
return o[iteratorSymbol]()
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function fromIterable (iterable) {
return new Stream(new IterableSource(iterable))
function IterableSource (iterable) {
this.iterable = iterable;
} = function (sink, scheduler) {
return scheduler.asap(new PropagateTask(runProducer$1, getIterator(this.iterable), sink))
function runProducer$1 (t, iterator, sink) {
var r =;
while (!r.done && {
sink.event(t, r.value);
r =;
sink.end(t, r.value);
var commonjsGlobal = typeof window !== 'undefined' ? window : typeof global !== 'undefined' ? global : typeof self !== 'undefined' ? self : {};
function createCommonjsModule$1(fn, module) {
return module = { exports: {} }, fn(module, module.exports), module.exports;
var ponyfill = createCommonjsModule$1(function (module, exports) {
'use strict';
Object.defineProperty(exports, "__esModule", {
value: true
exports['default'] = symbolObservablePonyfill;
function symbolObservablePonyfill(root) {
var result;
var _Symbol = root.Symbol;
if (typeof _Symbol === 'function') {
if (_Symbol.observable) {
result = _Symbol.observable;
} else {
result = _Symbol('observable');
_Symbol.observable = result;
} else {
result = '@@observable';
return result;
var index$2 = createCommonjsModule$1(function (module, exports) {
'use strict';
Object.defineProperty(exports, "__esModule", {
value: true
var _ponyfill2 = _interopRequireDefault(ponyfill);
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { 'default': obj }; }
var root; /* global window */
if (typeof self !== 'undefined') {
root = self;
} else if (typeof window !== 'undefined') {
root = window;
} else if (typeof commonjsGlobal !== 'undefined') {
root = commonjsGlobal;
} else {
root = module;
var result = (0, _ponyfill2['default'])(root);
exports['default'] = result;
var index$1 = index$2;
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function getObservable (o) { // eslint-disable-line complexity
var obs = null;
if (o) {
// Access foreign method only once
var method = o[index$1];
if (typeof method === 'function') {
obs =;
if (!(obs && typeof obs.subscribe === 'function')) {
throw new TypeError('invalid observable ' + obs)
return obs
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function tryEvent (t, x, sink) {
try {
sink.event(t, x);
} catch (e) {
sink.error(t, e);
function tryEnd (t, x, sink) {
try {
sink.end(t, x);
} catch (e) {
sink.error(t, e);
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function fromObservable (observable) {
return new Stream(new ObservableSource(observable))
function ObservableSource (observable) {
this.observable = observable;
} = function (sink, scheduler) {
var sub = this.observable.subscribe(new SubscriberSink(sink, scheduler));
if (typeof sub === 'function') {
return create(sub)
} else if (sub && typeof sub.unsubscribe === 'function') {
return create(unsubscribe, sub)
throw new TypeError('Observable returned invalid subscription ' + String(sub))
function SubscriberSink (sink, scheduler) {
this.sink = sink;
this.scheduler = scheduler;
} = function (x) {
tryEvent(, x, this.sink);
SubscriberSink.prototype.complete = function (x) {
tryEnd(, x, this.sink);
SubscriberSink.prototype.error = function (e) {
this.sink.error(, e);
function unsubscribe (subscription) {
return subscription.unsubscribe()
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function from (a) { // eslint-disable-line complexity
if (a instanceof Stream) {
return a
var observable = getObservable(a);
if (observable != null) {
return fromObservable(observable)
if (Array.isArray(a) || isArrayLike(a)) {
return fromArray(a)
if (isIterable$1(a)) {
return fromIterable(a)
throw new TypeError('from(x) must be observable, iterable, or array-like: ' + a)
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* Create a stream that emits the current time periodically
* @param {Number} period periodicity of events in millis
* @param {*} deprecatedValue @deprecated value to emit each period
* @returns {Stream} new stream that emits the current time every period
function periodic (period, deprecatedValue) {
return new Stream(new Periodic(period, deprecatedValue))
function Periodic (period, value) {
this.period = period;
this.value = value;
} = function (sink, scheduler) {
return scheduler.periodic(this.period, PropagateTask.event(this.value, sink))
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function ScheduledTask (delay, period, task, scheduler) {
this.time = delay;
this.period = period;
this.task = task;
this.scheduler = scheduler; = true;
} = function () {
ScheduledTask.prototype.error = function (e) {
return this.task.error(this.time, e)
ScheduledTask.prototype.dispose = function () {
return this.task.dispose()
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function defer (task) {
return Promise.resolve(task).then(runTask)
function runTask (task) {
try {
} catch (e) {
return task.error(e)
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function Scheduler (timer, timeline) {
this.timer = timer;
this.timeline = timeline;
this._timer = null;
this._nextArrival = Infinity;
var self = this;
this._runReadyTasksBound = function () {
} = function () {
Scheduler.prototype.asap = function (task) {
return this.schedule(0, -1, task)
Scheduler.prototype.delay = function (delay, task) {
return this.schedule(delay, -1, task)
Scheduler.prototype.periodic = function (period, task) {
return this.schedule(0, period, task)
Scheduler.prototype.schedule = function (delay, period, task) {
var now =;
var st = new ScheduledTask(now + Math.max(0, delay), period, task, this);
return st
Scheduler.prototype.cancel = function (task) { = false;
if (this.timeline.remove(task)) {
Scheduler.prototype.cancelAll = function (f) {
Scheduler.prototype._reschedule = function () {
if (this.timeline.isEmpty()) {
} else {
Scheduler.prototype._unschedule = function () {
this._timer = null;
Scheduler.prototype._scheduleNextRun = function (now) { // eslint-disable-line complexity
if (this.timeline.isEmpty()) {
var nextArrival = this.timeline.nextArrival();
if (this._timer === null) {
this._scheduleNextArrival(nextArrival, now);
} else if (nextArrival < this._nextArrival) {
this._scheduleNextArrival(nextArrival, now);
Scheduler.prototype._scheduleNextArrival = function (nextArrival, now) {
this._nextArrival = nextArrival;
var delay = Math.max(0, nextArrival - now);
this._timer = this.timer.setTimer(this._runReadyTasksBound, delay);
Scheduler.prototype._runReadyTasks = function (now) {
this._timer = null;
this.timeline.runTasks(now, runTask);
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
/*global setTimeout, clearTimeout*/
function ClockTimer () {} =;
ClockTimer.prototype.setTimer = function (f, dt) {
return dt <= 0 ? runAsap(f) : setTimeout(f, dt)
ClockTimer.prototype.clearTimer = function (t) {
return t instanceof Asap ? t.cancel() : clearTimeout(t)
function Asap (f) {
this.f = f; = true;
} = function () {
return && this.f()
Asap.prototype.error = function (e) {
throw e
Asap.prototype.cancel = function () { = false;
function runAsap (f) {
var task = new Asap(f);
return task
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function Timeline () {
this.tasks = [];
Timeline.prototype.nextArrival = function () {
return this.isEmpty() ? Infinity : this.tasks[0].time
Timeline.prototype.isEmpty = function () {
return this.tasks.length === 0
Timeline.prototype.add = function (st) {
insertByTime(st, this.tasks);
Timeline.prototype.remove = function (st) {
var i = binarySearch(st.time, this.tasks);
if (i >= 0 && i < this.tasks.length) {
var at = findIndex(st, this.tasks[i].events);
if (at >= 0) {
this.tasks[i].events.splice(at, 1);
return true
return false
Timeline.prototype.removeAll = function (f) {
for (var i = 0, l = this.tasks.length; i < l; ++i) {
removeAllFrom(f, this.tasks[i]);
Timeline.prototype.runTasks = function (t, runTask) {
var tasks = this.tasks;
var l = tasks.length;
var i = 0;
while (i < l && tasks[i].time <= t) {
this.tasks = tasks.slice(i);
// Run all ready tasks
for (var j = 0; j < i; ++j) {
this.tasks = runTasks(runTask, tasks[j], this.tasks);
function runTasks (runTask, timeslot, tasks) { // eslint-disable-line complexity
var events =;
for (var i = 0; i < events.length; ++i) {
var task = events[i];
if ( {
// Reschedule periodic repeating tasks
// Check active again, since a task may have canceled itself
if (task.period >= 0 && {
task.time = task.time + task.period;
insertByTime(task, tasks);
return tasks
function insertByTime (task, timeslots) { // eslint-disable-line complexity
var l = timeslots.length;
if (l === 0) {
timeslots.push(newTimeslot(task.time, [task]));
var i = binarySearch(task.time, timeslots);
if (i >= l) {
timeslots.push(newTimeslot(task.time, [task]));
} else if (task.time === timeslots[i].time) {
} else {
timeslots.splice(i, 0, newTimeslot(task.time, [task]));
function removeAllFrom (f, timeslot) { = removeAll(f,;
function binarySearch (t, sortedArray) { // eslint-disable-line complexity
var lo = 0;
var hi = sortedArray.length;
var mid, y;
while (lo < hi) {
mid = Math.floor((lo + hi) / 2);
y = sortedArray[mid];
if (t === y.time) {
return mid
} else if (t < y.time) {
hi = mid;
} else {
lo = mid + 1;
return hi
function newTimeslot (t, events) {
return { time: t, events: events }
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
var defaultScheduler = new Scheduler(new ClockTimer(), new Timeline());
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function subscribe (subscriber, stream) {
if (subscriber == null || typeof subscriber !== 'object') {
throw new TypeError('subscriber must be an object')
var disposable = settable();
var observer = new SubscribeObserver(fatalError, subscriber, disposable);
disposable.setDisposable(, defaultScheduler));
return new Subscription(disposable)
function SubscribeObserver (fatalError$$1, subscriber, disposable) {
this.fatalError = fatalError$$1;
this.subscriber = subscriber;
this.disposable = disposable;
SubscribeObserver.prototype.event = function (t, x) {
if (!this.disposable.disposed && typeof === 'function') {;
SubscribeObserver.prototype.end = function (t, x) {
if (!this.disposable.disposed) {
var s = this.subscriber;
doDispose(this.fatalError, s, s.complete, s.error, this.disposable, x);
SubscribeObserver.prototype.error = function (t, e) {
var s = this.subscriber;
doDispose(this.fatalError, s, s.error, s.error, this.disposable, e);
function Subscription (disposable) {
this.disposable = disposable;
Subscription.prototype.unsubscribe = function () {
function doDispose (fatal, subscriber, complete, error, disposable, x) {
Promise.resolve(disposable.dispose()).then(function () {
if (typeof complete === 'function') {, x);
}).catch(function (e) {
if (typeof error === 'function') {, e);
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function thru (f, stream) {
return f(stream)
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function EventTargetSource (event, source, capture) {
this.event = event;
this.source = source;
this.capture = capture;
} = function (sink, scheduler) {
function addEvent (e) {
tryEvent(, e, sink);
this.source.addEventListener(this.event, addEvent, this.capture);
return create(disposeEventTarget,
{ target: this, addEvent: addEvent })
function disposeEventTarget (info) {
var target =;
target.source.removeEventListener(target.event, info.addEvent, target.capture);
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function DeferredSink (sink) {
this.sink = sink; = []; = true;
DeferredSink.prototype.event = function (t, x) {
if (! {
if ( === 0) {
defer(new PropagateAllTask(this.sink, t,;
}{ time: t, value: x });
DeferredSink.prototype.end = function (t, x) {
if (! {
this._end(new EndTask(t, x, this.sink));
DeferredSink.prototype.error = function (t, e) {
this._end(new ErrorTask(t, e, this.sink));
DeferredSink.prototype._end = function (task) { = false;
function PropagateAllTask (sink, time, events) {
this.sink = sink; = events;
this.time = time;
} = function () {
var events =;
var sink = this.sink;
var event;
for (var i = 0, l = events.length; i < l; ++i) {
event = events[i];
this.time = event.time;
sink.event(event.time, event.value);
events.length = 0;
PropagateAllTask.prototype.error = function (e) {
this.sink.error(this.time, e);
function EndTask (t, x, sink) {
this.time = t;
this.value = x;
this.sink = sink;
} = function () {
this.sink.end(this.time, this.value);
EndTask.prototype.error = function (e) {
this.sink.error(this.time, e);
function ErrorTask (t, e, sink) {
this.time = t;
this.value = e;
this.sink = sink;
} = function () {
this.sink.error(this.time, this.value);
ErrorTask.prototype.error = function (e) {
throw e
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function EventEmitterSource (event, source) {
this.event = event;
this.source = source;
} = function (sink, scheduler) {
// NOTE: Because EventEmitter allows events in the same call stack as
// a listener is added, use a DeferredSink to buffer events
// until the stack clears, then propagate. This maintains most.js's
// invariant that no event will be delivered in the same call stack
// as an observer begins observing.
var dsink = new DeferredSink(sink);
function addEventVariadic (a) {
var l = arguments.length;
if (l > 1) {
var arr = new Array(l);
for (var i = 0; i < l; ++i) {
arr[i] = arguments[i];
tryEvent(, arr, dsink);
} else {
tryEvent(, a, dsink);
this.source.addListener(this.event, addEventVariadic);
return create(disposeEventEmitter, { target: this, addEvent: addEventVariadic })
function disposeEventEmitter (info) {
var target =;
target.source.removeListener(target.event, info.addEvent);
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* Create a stream from an EventTarget, such as a DOM Node, or EventEmitter.
* @param {String} event event type name, e.g. 'click'
* @param {EventTarget|EventEmitter} source EventTarget or EventEmitter
* @param {*?} capture for DOM events, whether to use
* capturing--passed as 3rd parameter to addEventListener.
* @returns {Stream} stream containing all events of the specified type
* from the source.
function fromEvent (event, source, capture) { // eslint-disable-line complexity
var s;
if (typeof source.addEventListener === 'function' && typeof source.removeEventListener === 'function') {
if (arguments.length < 3) {
capture = false;
s = new EventTargetSource(event, source, capture);
} else if (typeof source.addListener === 'function' && typeof source.removeListener === 'function') {
s = new EventEmitterSource(event, source);
} else {
throw new Error('source must support addEventListener/removeEventListener or addListener/removeListener')
return new Stream(s)
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function withDefaultScheduler (source) {
return withScheduler(source, defaultScheduler)
function withScheduler (source, scheduler) {
return new Promise(function (resolve, reject) {
runSource(source, scheduler, resolve, reject);
function runSource (source, scheduler, resolve, reject) {
var disposable = settable();
var observer = new Drain(resolve, reject, disposable);
disposable.setDisposable(, scheduler));
function Drain (end, error, disposable) {
this._end = end;
this._error = error;
this._disposable = disposable; = true;
Drain.prototype.event = function (t, x) {};
Drain.prototype.end = function (t, x) {
if (! {
} = false;
disposeThen(this._end, this._error, this._disposable, x);
Drain.prototype.error = function (t, e) { = false;
disposeThen(this._error, this._error, this._disposable, e);
function disposeThen (end, error, disposable, x) {
Promise.resolve(disposable.dispose()).then(function () {
}, error);
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* A sink mixin that simply forwards event, end, and error to
* another sink.
* @param sink
* @constructor
function Pipe (sink) {
this.sink = sink;
Pipe.prototype.event = function (t, x) {
return this.sink.event(t, x)
Pipe.prototype.end = function (t, x) {
return this.sink.end(t, x)
Pipe.prototype.error = function (t, e) {
return this.sink.error(t, e)
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function Filter (p, source) {
this.p = p;
this.source = source;
* Create a filtered source, fusing adjacent filter.filter if possible
* @param {function(x:*):boolean} p filtering predicate
* @param {{run:function}} source source to filter
* @returns {Filter} filtered source
Filter.create = function createFilter (p, source) {
if (source instanceof Filter) {
return new Filter(and(source.p, p), source.source)
return new Filter(p, source)
}; = function (sink, scheduler) {
return FilterSink(this.p, sink), scheduler)
function FilterSink (p, sink) {
this.p = p;
this.sink = sink;
FilterSink.prototype.end = Pipe.prototype.end;
FilterSink.prototype.error = Pipe.prototype.error;
FilterSink.prototype.event = function (t, x) {
var p = this.p;
p(x) && this.sink.event(t, x);
function and (p, q) {
return function (x) {
return p(x) && q(x)
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function FilterMap (p, f, source) {
this.p = p;
this.f = f;
this.source = source;
} = function (sink, scheduler) {
return FilterMapSink(this.p, this.f, sink), scheduler)
function FilterMapSink (p, f, sink) {
this.p = p;
this.f = f;
this.sink = sink;
FilterMapSink.prototype.event = function (t, x) {
var f = this.f;
var p = this.p;
p(x) && this.sink.event(t, f(x));
FilterMapSink.prototype.end = Pipe.prototype.end;
FilterMapSink.prototype.error = Pipe.prototype.error;
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function Map$1 (f, source) {
this.f = f;
this.source = source;
* Create a mapped source, fusing adjacent,,
* and if possible
* @param {function(*):*} f mapping function
* @param {{run:function}} source source to map
* @returns {Map|FilterMap} mapped source, possibly fused
Map$1.create = function createMap (f, source) {
if (source instanceof Map$1) {
return new Map$1(compose(f, source.f), source.source)
if (source instanceof Filter) {
return new FilterMap(source.p, f, source.source)
return new Map$1(f, source)
Map$ = function (sink, scheduler) { // eslint-disable-line no-extend-native
return MapSink(this.f, sink), scheduler)
function MapSink (f, sink) {
this.f = f;
this.sink = sink;
MapSink.prototype.end = Pipe.prototype.end;
MapSink.prototype.error = Pipe.prototype.error;
MapSink.prototype.event = function (t, x) {
var f = this.f;
this.sink.event(t, f(x));
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* Transform each value in the stream by applying f to each
* @param {function(*):*} f mapping function
* @param {Stream} stream stream to map
* @returns {Stream} stream containing items transformed by f
function map$3 (f, stream) {
return new Stream(Map$1.create(f, stream.source))
* Replace each value in the stream with x
* @param {*} x
* @param {Stream} stream
* @returns {Stream} stream containing items replaced with x
function constant (x, stream) {
return map$3(function () {
return x
}, stream)
* Perform a side effect for each item in the stream
* @param {function(x:*):*} f side effect to execute for each item. The
* return value will be discarded.
* @param {Stream} stream stream to tap
* @returns {Stream} new stream containing the same items as this stream
function tap (f, stream) {
return new Stream(new Tap(f, stream.source))
function Tap (f, source) {
this.source = source;
this.f = f;
} = function (sink, scheduler) {
return TapSink(this.f, sink), scheduler)
function TapSink (f, sink) {
this.sink = sink;
this.f = f;
TapSink.prototype.end = Pipe.prototype.end;
TapSink.prototype.error = Pipe.prototype.error;
TapSink.prototype.event = function (t, x) {
var f = this.f;
this.sink.event(t, x);
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* Observe all the event values in the stream in time order. The
* provided function `f` will be called for each event value
* @param {function(x:T):*} f function to call with each event value
* @param {Stream<T>} stream stream to observe
* @return {Promise} promise that fulfills after the stream ends without
* an error, or rejects if the stream ends with an error.
function observe (f, stream) {
return drain(tap(f, stream))
* "Run" a stream by creating demand and consuming all events
* @param {Stream<T>} stream stream to drain
* @return {Promise} promise that fulfills after the stream ends without
* an error, or rejects if the stream ends with an error.
function drain (stream) {
return withDefaultScheduler(stream.source)
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* Generalized feedback loop. Call a stepper function for each event. The stepper
* will be called with 2 params: the current seed and the an event value. It must
* return a new { seed, value } pair. The `seed` will be fed back into the next
* invocation of stepper, and the `value` will be propagated as the event value.
* @param {function(seed:*, value:*):{seed:*, value:*}} stepper loop step function
* @param {*} seed initial seed value passed to first stepper call
* @param {Stream} stream event stream
* @returns {Stream} new stream whose values are the `value` field of the objects
* returned by the stepper
function loop (stepper, seed, stream) {
return new Stream(new Loop(stepper, seed, stream.source))
function Loop (stepper, seed, source) {
this.step = stepper;
this.seed = seed;
this.source = source;
} = function (sink, scheduler) {
return LoopSink(this.step, this.seed, sink), scheduler)
function LoopSink (stepper, seed, sink) {
this.step = stepper;
this.seed = seed;
this.sink = sink;
LoopSink.prototype.error = Pipe.prototype.error;
LoopSink.prototype.event = function (t, x) {
var result = this.step(this.seed, x);
this.seed = result.seed;
this.sink.event(t, result.value);
LoopSink.prototype.end = function (t) {
this.sink.end(t, this.seed);
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* Create a stream containing successive reduce results of applying f to
* the previous reduce result and the current stream item.
* @param {function(result:*, x:*):*} f reducer function
* @param {*} initial initial value
* @param {Stream} stream stream to scan
* @returns {Stream} new stream containing successive reduce results
function scan (f, initial, stream) {
return new Stream(new Scan(f, initial, stream.source))
function Scan (f, z, source) {
this.source = source;
this.f = f;
this.value = z;
} = function (sink, scheduler) {
var d1 = scheduler.asap(PropagateTask.event(this.value, sink));
var d2 = ScanSink(this.f, this.value, sink), scheduler);
return all$1([d1, d2])
function ScanSink (f, z, sink) {
this.f = f;
this.value = z;
this.sink = sink;
ScanSink.prototype.event = function (t, x) {
var f = this.f;
this.value = f(this.value, x);
this.sink.event(t, this.value);
ScanSink.prototype.error = Pipe.prototype.error;
ScanSink.prototype.end = Pipe.prototype.end;
* Reduce a stream to produce a single result. Note that reducing an infinite
* stream will return a Promise that never fulfills, but that may reject if an error
* occurs.
* @param {function(result:*, x:*):*} f reducer function
* @param {*} initial initial value
* @param {Stream} stream to reduce
* @returns {Promise} promise for the file result of the reduce
function reduce$1 (f, initial, stream) {
return withDefaultScheduler(new Reduce(f, initial, stream.source))
function Reduce (f, z, source) {
this.source = source;
this.f = f;
this.value = z;
} = function (sink, scheduler) {
return ReduceSink(this.f, this.value, sink), scheduler)
function ReduceSink (f, z, sink) {
this.f = f;
this.value = z;
this.sink = sink;
ReduceSink.prototype.event = function (t, x) {
var f = this.f;
this.value = f(this.value, x);
this.sink.event(t, this.value);
ReduceSink.prototype.error = Pipe.prototype.error;
ReduceSink.prototype.end = function (t) {
this.sink.end(t, this.value);
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* Compute a stream by unfolding tuples of future values from a seed value
* Event times may be controlled by returning a Promise from f
* @param {function(seed:*):{value:*, seed:*, done:boolean}|Promise<{value:*, seed:*, done:boolean}>} f unfolding function accepts
* a seed and returns a new tuple with a value, new seed, and boolean done flag.
* If tuple.done is true, the stream will end.
* @param {*} seed seed value
* @returns {Stream} stream containing all value of all tuples produced by the
* unfolding function.
function unfold (f, seed) {
return new Stream(new UnfoldSource(f, seed))
function UnfoldSource (f, seed) {
this.f = f;
this.value = seed;
} = function (sink, scheduler) {
return new Unfold(this.f, this.value, sink, scheduler)
function Unfold (f, x, sink, scheduler) {
this.f = f;
this.sink = sink;
this.scheduler = scheduler; = true;
var self = this;
function err (e) {
self.sink.error(, e);
function start (unfold) {
return stepUnfold(unfold, x)
Unfold.prototype.dispose = function () { = false;
function stepUnfold (unfold, x) {
var f = unfold.f;
return Promise.resolve(f(x)).then(function (tuple) {
return continueUnfold(unfold, tuple)
function continueUnfold (unfold, tuple) {
if (tuple.done) {
unfold.sink.end(, tuple.value);
return tuple.value
unfold.sink.event(, tuple.value);
if (! {
return tuple.value
return stepUnfold(unfold, tuple.seed)
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* Compute a stream by iteratively calling f to produce values
* Event times may be controlled by returning a Promise from f
* @param {function(x:*):*|Promise<*>} f
* @param {*} x initial value
* @returns {Stream}
function iterate (f, x) {
return new Stream(new IterateSource(f, x))
function IterateSource (f, x) {
this.f = f;
this.value = x;
} = function (sink, scheduler) {
return new Iterate(this.f, this.value, sink, scheduler)
function Iterate (f, initial, sink, scheduler) {
this.f = f;
this.sink = sink;
this.scheduler = scheduler; = true;
var x = initial;
var self = this;
function err (e) {
self.sink.error(, e);
function start (iterate) {
return stepIterate(iterate, x)
Iterate.prototype.dispose = function () { = false;
function stepIterate (iterate, x) {
iterate.sink.event(, x);
if (! {
return x
var f = iterate.f;
return Promise.resolve(f(x)).then(function (y) {
return continueIterate(iterate, y)
function continueIterate (iterate, x) {
return ! ? iterate.value : stepIterate(iterate, x)
/** @license MIT License (c) copyright 2010-2014 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* Compute a stream using an *async* generator, which yields promises
* to control event times.
* @param f
* @returns {Stream}
function generate (f /*, ...args */) {
return new Stream(new GenerateSource(f, tail(arguments)))
function GenerateSource (f, args) {
this.f = f;
this.args = args;
} = function (sink, scheduler) {
return new Generate(this.f.apply(void 0, this.args), sink, scheduler)
function Generate (iterator, sink, scheduler) {
this.iterator = iterator;
this.sink = sink;
this.scheduler = scheduler; = true;
var self = this;
function err (e) {
self.sink.error(, e);
function next (generate, x) {
return ? handle(generate, : x
function handle (generate, result) {
if (result.done) {
return generate.sink.end(, result.value)
return Promise.resolve(result.value).then(function (x) {
return emit$1(generate, x)
}, function (e) {
return error$1(generate, e)
function emit$1 (generate, x) {
generate.sink.event(, x);
return next(generate, x)
function error$1 (generate, e) {
return handle(generate, generate.iterator.throw(e))
Generate.prototype.dispose = function () { = false;
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function continueWith (f, stream) {
return new Stream(new ContinueWith(f, stream.source))
function ContinueWith (f, source) {
this.f = f;
this.source = source;
} = function (sink, scheduler) {
return new ContinueWithSink(this.f, this.source, sink, scheduler)
function ContinueWithSink (f, source, sink, scheduler) {
this.f = f;
this.sink = sink;
this.scheduler = scheduler; = true;
this.disposable = once(, scheduler));
ContinueWithSink.prototype.error = Pipe.prototype.error;
ContinueWithSink.prototype.event = function (t, x) {
if (! {
this.sink.event(t, x);
ContinueWithSink.prototype.end = function (t, x) {
if (! {
tryDispose(t, this.disposable, this.sink);
this._startNext(t, x, this.sink);
ContinueWithSink.prototype._startNext = function (t, x, sink) {
try {
this.disposable = this._continue(this.f, x, sink);
} catch (e) {
sink.error(t, e);
ContinueWithSink.prototype._continue = function (f, x, sink) {
return f(x), this.scheduler)
ContinueWithSink.prototype.dispose = function () { = false;
return this.disposable.dispose()
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* @param {*} x value to prepend
* @param {Stream} stream
* @returns {Stream} new stream with x prepended
function cons$1 (x, stream) {
return concat(of(x), stream)
* @param {Stream} left
* @param {Stream} right
* @returns {Stream} new stream containing all events in left followed by all
* events in right. This *timeshifts* right to the end of left.
function concat (left, right) {
return continueWith(function () {
return right
}, left)
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function IndexSink (i, sink) {
this.sink = sink;
this.index = i; = true;
this.value = void 0;
IndexSink.prototype.event = function (t, x) {
if (! {
this.value = x;
this.sink.event(t, this);
IndexSink.prototype.end = function (t, x) {
if (! {
} = false;
this.sink.end(t, { index: this.index, value: x });
IndexSink.prototype.error = Pipe.prototype.error;
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function invoke (f, args) {
/*eslint complexity: [2,7]*/
switch (args.length) {
case 0: return f()
case 1: return f(args[0])
case 2: return f(args[0], args[1])
case 3: return f(args[0], args[1], args[2])
case 4: return f(args[0], args[1], args[2], args[3])
case 5: return f(args[0], args[1], args[2], args[3], args[4])
return f.apply(void 0, args)
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
var map$4 = map$1;
var tail$1 = tail;
* Combine latest events from all input streams
* @param {function(*} f function to combine most recent events
* @returns {Stream} stream containing the result of applying f to the most recent
* event of each input stream, whenever a new event arrives on any stream.
function combine (f /*, ...streams */) {
return combineArray(f, tail$1(arguments))
* Combine latest events from all input streams
* @param {function(*} f function to combine most recent events
* @param {[Stream]} streams most recent events
* @returns {Stream} stream containing the result of applying f to the most recent
* event of each input stream, whenever a new event arrives on any stream.
function combineArray (f, streams) {
var l = streams.length;
return l === 0 ? empty$$1()
: l === 1 ? map$3(f, streams[0])
: new Stream(combineSources(f, streams))
function combineSources (f, streams) {
return new Combine(f, map$4(getSource, streams))
function getSource (stream) {
return stream.source
function Combine (f, sources) {
this.f = f;
this.sources = sources;
} = function (sink, scheduler) {
var l = this.sources.length;
var disposables = new Array(l);
var sinks = new Array(l);
var mergeSink = new CombineSink(disposables, sinks, sink, this.f);
for (var indexSink, i = 0; i < l; ++i) {
indexSink = sinks[i] = new IndexSink(i, mergeSink);
disposables[i] = this.sources[i].run(indexSink, scheduler);
return all$1(disposables)
function CombineSink (disposables, sinks, sink, f) {
this.sink = sink;
this.disposables = disposables;
this.sinks = sinks;
this.f = f;
var l = sinks.length;
this.awaiting = l;
this.values = new Array(l);
this.hasValue = new Array(l);
for (var i = 0; i < l; ++i) {
this.hasValue[i] = false;
this.activeCount = sinks.length;
CombineSink.prototype.error = Pipe.prototype.error;
CombineSink.prototype.event = function (t, indexedValue) {
var i = indexedValue.index;
var awaiting = this._updateReady(i);
this.values[i] = indexedValue.value;
if (awaiting === 0) {
this.sink.event(t, invoke(this.f, this.values));
CombineSink.prototype._updateReady = function (index) {
if (this.awaiting > 0) {
if (!this.hasValue[index]) {
this.hasValue[index] = true;
this.awaiting -= 1;
return this.awaiting
CombineSink.prototype.end = function (t, indexedValue) {
tryDispose(t, this.disposables[indexedValue.index], this.sink);
if (--this.activeCount === 0) {
this.sink.end(t, indexedValue.value);
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* Assume fs is a stream containing functions, and apply the latest function
* in fs to the latest value in xs.
* fs: --f---------g--------h------>
* xs: -a-------b-------c-------d-->
* ap(fs, xs): --fa-----fb-gb---gc--hc--hd->
* @param {Stream} fs stream of functions to apply to the latest x
* @param {Stream} xs stream of values to which to apply all the latest f
* @returns {Stream} stream containing all the applications of fs to xs
function ap (fs, xs) {
return combine(apply, fs, xs)
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* Transform a stream by passing its events through a transducer.
* @param {function} transducer transducer function
* @param {Stream} stream stream whose events will be passed through the
* transducer
* @return {Stream} stream of events transformed by the transducer
function transduce (transducer, stream) {
return new Stream(new Transduce(transducer, stream.source))
function Transduce (transducer, source) {
this.transducer = transducer;
this.source = source;
} = function (sink, scheduler) {
var xf = this.transducer(new Transformer(sink));
return TransduceSink(getTxHandler(xf), sink), scheduler)
function TransduceSink (adapter, sink) {
this.xf = adapter;
this.sink = sink;
TransduceSink.prototype.event = function (t, x) {
var next = this.xf.step(t, x);
return this.xf.isReduced(next)
? this.sink.end(t, this.xf.getResult(next))
: next
TransduceSink.prototype.end = function (t, x) {
return this.xf.result(x)
TransduceSink.prototype.error = function (t, e) {
return this.sink.error(t, e)
function Transformer (sink) {
this.time = -Infinity;
this.sink = sink;
Transformer.prototype['@@transducer/init'] = Transformer.prototype.init = function () {};
Transformer.prototype['@@transducer/step'] = Transformer.prototype.step = function (t, x) {
if (!isNaN(t)) {
this.time = Math.max(t, this.time);
return this.sink.event(this.time, x)
Transformer.prototype['@@transducer/result'] = Transformer.prototype.result = function (x) {
return this.sink.end(this.time, x)
* Given an object supporting the new or legacy transducer protocol,
* create an adapter for it.
* @param {object} tx transform
* @returns {TxAdapter|LegacyTxAdapter}
function getTxHandler (tx) {
return typeof tx['@@transducer/step'] === 'function'
? new TxAdapter(tx)
: new LegacyTxAdapter(tx)
* Adapter for new official transducer protocol
* @param {object} tx transform
* @constructor
function TxAdapter (tx) {
this.tx = tx;
TxAdapter.prototype.step = function (t, x) {
return this.tx['@@transducer/step'](t, x)
TxAdapter.prototype.result = function (x) {
return this.tx['@@transducer/result'](x)
TxAdapter.prototype.isReduced = function (x) {
return x != null && x['@@transducer/reduced']
TxAdapter.prototype.getResult = function (x) {
return x['@@transducer/value']
* Adapter for older transducer protocol
* @param {object} tx transform
* @constructor
function LegacyTxAdapter (tx) {
this.tx = tx;
LegacyTxAdapter.prototype.step = function (t, x) {
return this.tx.step(t, x)
LegacyTxAdapter.prototype.result = function (x) {
return this.tx.result(x)
LegacyTxAdapter.prototype.isReduced = function (x) {
return x != null && x.__transducers_reduced__
LegacyTxAdapter.prototype.getResult = function (x) {
return x.value
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* Doubly linked list
* @constructor
function LinkedList () {
this.head = null;
this.length = 0;
* Add a node to the end of the list
* @param {{prev:Object|null, next:Object|null, dispose:function}} x node to add
LinkedList.prototype.add = function (x) {
if (this.head !== null) {
this.head.prev = x; = this.head;
this.head = x;
* Remove the provided node from the list
* @param {{prev:Object|null, next:Object|null, dispose:function}} x node to remove
LinkedList.prototype.remove = function (x) { // eslint-disable-line complexity
if (x === this.head) {
this.head =;
if ( !== null) { = x.prev; = null;
if (x.prev !== null) { =;
x.prev = null;
* @returns {boolean} true iff there are no nodes in the list
LinkedList.prototype.isEmpty = function () {
return this.length === 0
* Dispose all nodes
* @returns {Promise} promise that fulfills when all nodes have been disposed,
* or rejects if an error occurs while disposing
LinkedList.prototype.dispose = function () {
if (this.isEmpty()) {
return Promise.resolve()
var promises = [];
var x = this.head;
this.head = null;
this.length = 0;
while (x !== null) {
x =;
return Promise.all(promises)
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function mergeConcurrently (concurrency, stream) {
return mergeMapConcurrently(id, concurrency, stream)
function mergeMapConcurrently (f, concurrency, stream) {
return new Stream(new MergeConcurrently(f, concurrency, stream.source))
function MergeConcurrently (f, concurrency, source) {
this.f = f;
this.concurrency = concurrency;
this.source = source;
} = function (sink, scheduler) {
return new Outer(this.f, this.concurrency, this.source, sink, scheduler)
function Outer (f, concurrency, source, sink, scheduler) {
this.f = f;
this.concurrency = concurrency;
this.sink = sink;
this.scheduler = scheduler;
this.pending = [];
this.current = new LinkedList();
this.disposable = once(, scheduler)); = true;
Outer.prototype.event = function (t, x) {
this._addInner(t, x);
Outer.prototype._addInner = function (t, x) {
if (this.current.length < this.concurrency) {
this._startInner(t, x);
} else {
Outer.prototype._startInner = function (t, x) {
try {
this._initInner(t, x);
} catch (e) {
this.error(t, e);
Outer.prototype._initInner = function (t, x) {
var innerSink = new Inner(t, this, this.sink);
innerSink.disposable = mapAndRun(this.f, x, innerSink, this.scheduler);
function mapAndRun (f, x, sink, scheduler) {
return f(x), scheduler)
Outer.prototype.end = function (t, x) { = false;
tryDispose(t, this.disposable, this.sink);
this._checkEnd(t, x);
Outer.prototype.error = function (t, e) { = false;
this.sink.error(t, e);
Outer.prototype.dispose = function () { = false;
this.pending.length = 0;
return Promise.all([this.disposable.dispose(), this.current.dispose()])
Outer.prototype._endInner = function (t, x, inner) {
tryDispose(t, inner, this);
if (this.pending.length === 0) {
this._checkEnd(t, x);
} else {
this._startInner(t, this.pending.shift());
Outer.prototype._checkEnd = function (t, x) {
if (! && this.current.isEmpty()) {
this.sink.end(t, x);
function Inner (time, outer, sink) {
this.prev = = null;
this.time = time;
this.outer = outer;
this.sink = sink;
this.disposable = void 0;
Inner.prototype.event = function (t, x) {
this.sink.event(Math.max(t, this.time), x);
Inner.prototype.end = function (t, x) {
this.outer._endInner(Math.max(t, this.time), x, this);
Inner.prototype.error = function (t, e) {
this.outer.error(Math.max(t, this.time), e);
Inner.prototype.dispose = function () {
return this.disposable.dispose()
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* Map each value in the stream to a new stream, and merge it into the
* returned outer stream. Event arrival times are preserved.
* @param {function(x:*):Stream} f chaining function, must return a Stream
* @param {Stream} stream
* @returns {Stream} new stream containing all events from each stream returned by f
function flatMap (f, stream) {
return mergeMapConcurrently(f, Infinity, stream)
* Monadic join. Flatten a Stream<Stream<X>> to Stream<X> by merging inner
* streams to the outer. Event arrival times are preserved.
* @param {Stream<Stream<X>>} stream stream of streams
* @returns {Stream<X>} new stream containing all events of all inner streams
function join (stream) {
return mergeConcurrently(Infinity, stream)
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* Map each value in stream to a new stream, and concatenate them all
* stream: -a---b---cX
* f(a): 1-1-1-1X
* f(b): -2-2-2-2X
* f(c): -3-3-3-3X
* stream.concatMap(f): -1-1-1-1-2-2-2-2-3-3-3-3X
* @param {function(x:*):Stream} f function to map each value to a stream
* @param {Stream} stream
* @returns {Stream} new stream containing all events from each stream returned by f
function concatMap (f, stream) {
return mergeMapConcurrently(f, 1, stream)
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
var copy$1 = copy;
var reduce$2 = reduce;
* @returns {Stream} stream containing events from all streams in the argument
* list in time order. If two events are simultaneous they will be merged in
* arbitrary order.
function merge$1 (/* ...streams*/) {
return mergeArray(copy$1(arguments))
* @param {Array} streams array of stream to merge
* @returns {Stream} stream containing events from all input observables
* in time order. If two events are simultaneous they will be merged in
* arbitrary order.
function mergeArray (streams) {
var l = streams.length;
return l === 0 ? empty$$1()
: l === 1 ? streams[0]
: new Stream(mergeSources(streams))
* This implements fusion/flattening for merge. It will
* fuse adjacent merge operations. For example:
* - a.merge(b).merge(c) effectively becomes merge(a, b, c)
* - merge(a, merge(b, c)) effectively becomes merge(a, b, c)
* It does this by concatenating the sources arrays of
* any nested Merge sources, in effect "flattening" nested
* merge operations into a single merge.
function mergeSources (streams) {
return new Merge$1(reduce$2(appendSources, [], streams))
function appendSources (sources, stream) {
var source = stream.source;
return source instanceof Merge$1
? sources.concat(source.sources)
: sources.concat(source)
function Merge$1 (sources) {
this.sources = sources;
Merge$ = function (sink, scheduler) {
var l = this.sources.length;
var disposables = new Array(l);
var sinks = new Array(l);
var mergeSink = new MergeSink(disposables, sinks, sink);
for (var indexSink, i = 0; i < l; ++i) {
indexSink = sinks[i] = new IndexSink(i, mergeSink);
disposables[i] = this.sources[i].run(indexSink, scheduler);
return all$1(disposables)
function MergeSink (disposables, sinks, sink) {
this.sink = sink;
this.disposables = disposables;
this.activeCount = sinks.length;
MergeSink.prototype.error = Pipe.prototype.error;
MergeSink.prototype.event = function (t, indexValue) {
this.sink.event(t, indexValue.value);
MergeSink.prototype.end = function (t, indexedValue) {
tryDispose(t, this.disposables[indexedValue.index], this.sink);
if (--this.activeCount === 0) {
this.sink.end(t, indexedValue.value);
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* When an event arrives on sampler, emit the result of calling f with the latest
* values of all streams being sampled
* @param {function(...values):*} f function to apply to each set of sampled values
* @param {Stream} sampler streams will be sampled whenever an event arrives
* on sampler
* @returns {Stream} stream of sampled and transformed values
function sample (f, sampler /*, ...streams */) {
return sampleArray(f, sampler, drop(2, arguments))
* When an event arrives on sampler, emit the latest event value from stream.
* @param {Stream} sampler stream of events at whose arrival time
* stream's latest value will be propagated
* @param {Stream} stream stream of values
* @returns {Stream} sampled stream of values
function sampleWith (sampler, stream) {
return new Stream(new Sampler(id, sampler.source, [stream.source]))
function sampleArray (f, sampler, streams) {
return new Stream(new Sampler(f, sampler.source, map$1(getSource$1, streams)))
function getSource$1 (stream) {
return stream.source
function Sampler (f, sampler, sources) {
this.f = f;
this.sampler = sampler;
this.sources = sources;
} = function (sink, scheduler) {
var l = this.sources.length;
var disposables = new Array(l + 1);
var sinks = new Array(l);
var sampleSink = new SampleSink(this.f, sinks, sink);
for (var hold, i = 0; i < l; ++i) {
hold = sinks[i] = new Hold(sampleSink);
disposables[i] = this.sources[i].run(hold, scheduler);
disposables[i] =, scheduler);
return all$1(disposables)
function Hold (sink) {
this.sink = sink;
this.hasValue = false;
Hold.prototype.event = function (t, x) {
this.value = x;
this.hasValue = true;
Hold.prototype.end = function () {};
Hold.prototype.error = Pipe.prototype.error;
function SampleSink (f, sinks, sink) {
this.f = f;
this.sinks = sinks;
this.sink = sink; = false;
SampleSink.prototype._notify = function () {
if (! { = this.sinks.every(hasValue);
SampleSink.prototype.event = function (t) {
if ( {
this.sink.event(t, invoke(this.f, map$1(getValue$1, this.sinks)));
SampleSink.prototype.end = Pipe.prototype.end;
SampleSink.prototype.error = Pipe.prototype.error;
function hasValue (hold) {
return hold.hasValue
function getValue$1 (hold) {
return hold.value
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
// Based on
function Queue (capPow2) {
this._capacity = capPow2 || 32;
this._length = 0;
this._head = 0;
Queue.prototype.push = function (x) {
var len = this._length;
this._checkCapacity(len + 1);
var i = (this._head + len) & (this._capacity - 1);
this[i] = x;
this._length = len + 1;
Queue.prototype.shift = function () {
var head = this._head;
var x = this[head];
this[head] = void 0;
this._head = (head + 1) & (this._capacity - 1);
return x
Queue.prototype.isEmpty = function () {
return this._length === 0
Queue.prototype.length = function () {
return this._length
Queue.prototype._checkCapacity = function (size) {
if (this._capacity < size) {
this._ensureCapacity(this._capacity << 1);
Queue.prototype._ensureCapacity = function (capacity) {
var oldCapacity = this._capacity;
this._capacity = capacity;
var last = this._head + this._length;
if (last > oldCapacity) {
copy$2(this, 0, this, oldCapacity, last & (oldCapacity - 1));
function copy$2 (src, srcIndex, dst, dstIndex, len) {
for (var j = 0; j < len; ++j) {
dst[j + dstIndex] = src[j + srcIndex];
src[j + srcIndex] = void 0;
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
var map$5 = map$1;
var tail$2 = tail;
* Combine streams pairwise (or tuple-wise) by index by applying f to values
* at corresponding indices. The returned stream ends when any of the input
* streams ends.
* @param {function} f function to combine values
* @returns {Stream} new stream with items at corresponding indices combined
* using f
function zip (f /*, ...streams */) {
return zipArray(f, tail$2(arguments))
* Combine streams pairwise (or tuple-wise) by index by applying f to values
* at corresponding indices. The returned stream ends when any of the input
* streams ends.
* @param {function} f function to combine values
* @param {[Stream]} streams streams to zip using f
* @returns {Stream} new stream with items at corresponding indices combined
* using f
function zipArray (f, streams) {
return streams.length === 0 ? empty$$1()
: streams.length === 1 ? map$3(f, streams[0])
: new Stream(new Zip(f, map$5(getSource$2, streams)))
function getSource$2 (stream) {
return stream.source
function Zip (f, sources) {
this.f = f;
this.sources = sources;
} = function (sink, scheduler) {
var l = this.sources.length;
var disposables = new Array(l);
var sinks = new Array(l);
var buffers = new Array(l);
var zipSink = new ZipSink(this.f, buffers, sinks, sink);
for (var indexSink, i = 0; i < l; ++i) {
buffers[i] = new Queue();
indexSink = sinks[i] = new IndexSink(i, zipSink);
disposables[i] = this.sources[i].run(indexSink, scheduler);
return all$1(disposables)
function ZipSink (f, buffers, sinks, sink) {
this.f = f;
this.sinks = sinks;
this.sink = sink;
this.buffers = buffers;
ZipSink.prototype.event = function (t, indexedValue) { // eslint-disable-line complexity
var buffers = this.buffers;
var buffer = buffers[indexedValue.index];
if (buffer.length() === 1) {
if (!ready(this.buffers)) {
emitZipped(this.f, t, buffers, this.sink);
if (ended(this.buffers, this.sinks)) {
this.sink.end(t, void 0);
ZipSink.prototype.end = function (t, indexedValue) {
var buffer = this.buffers[indexedValue.index];
if (buffer.isEmpty()) {
this.sink.end(t, indexedValue.value);
ZipSink.prototype.error = Pipe.prototype.error;
function emitZipped (f, t, buffers, sink) {
sink.event(t, invoke(f, map$5(head, buffers)));
function head (buffer) {
return buffer.shift()
function ended (buffers, sinks) {
for (var i = 0, l = buffers.length; i < l; ++i) {
if (buffers[i].isEmpty() && !sinks[i].active) {
return true
return false
function ready (buffers) {
for (var i = 0, l = buffers.length; i < l; ++i) {
if (buffers[i].isEmpty()) {
return false
return true
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* Given a stream of streams, return a new stream that adopts the behavior
* of the most recent inner stream.
* @param {Stream} stream of streams on which to switch
* @returns {Stream} switching stream
function switchLatest (stream) {
return new Stream(new Switch(stream.source))
function Switch (source) {
this.source = source;
} = function (sink, scheduler) {
var switchSink = new SwitchSink(sink, scheduler);
return all$1([switchSink,, scheduler)])
function SwitchSink (sink, scheduler) {
this.sink = sink;
this.scheduler = scheduler;
this.current = null;
this.ended = false;
SwitchSink.prototype.event = function (t, stream) {
this._disposeCurrent(t); // TODO: capture the result of this dispose
this.current = new Segment(t, Infinity, this, this.sink);
this.current.disposable =, this.scheduler);
SwitchSink.prototype.end = function (t, x) {
this.ended = true;
this._checkEnd(t, x);
SwitchSink.prototype.error = function (t, e) {
this.ended = true;
this.sink.error(t, e);
SwitchSink.prototype.dispose = function () {
return this._disposeCurrent(
SwitchSink.prototype._disposeCurrent = function (t) {
if (this.current !== null) {
return this.current._dispose(t)
SwitchSink.prototype._disposeInner = function (t, inner) {
inner._dispose(t); // TODO: capture the result of this dispose
if (inner === this.current) {
this.current = null;
SwitchSink.prototype._checkEnd = function (t, x) {
if (this.ended && this.current === null) {
this.sink.end(t, x);
SwitchSink.prototype._endInner = function (t, x, inner) {
this._disposeInner(t, inner);
this._checkEnd(t, x);
SwitchSink.prototype._errorInner = function (t, e, inner) {
this._disposeInner(t, inner);
this.sink.error(t, e);
function Segment (min, max, outer, sink) {
this.min = min;
this.max = max;
this.outer = outer;
this.sink = sink;
this.disposable = empty$1();
Segment.prototype.event = function (t, x) {
if (t < this.max) {
this.sink.event(Math.max(t, this.min), x);
Segment.prototype.end = function (t, x) {
this.outer._endInner(Math.max(t, this.min), x, this);
Segment.prototype.error = function (t, e) {
this.outer._errorInner(Math.max(t, this.min), e, this);
Segment.prototype._dispose = function (t) {
this.max = t;
tryDispose(t, this.disposable, this.sink);
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* Retain only items matching a predicate
* @param {function(x:*):boolean} p filtering predicate called for each item
* @param {Stream} stream stream to filter
* @returns {Stream} stream containing only items for which predicate returns truthy
function filter (p, stream) {
return new Stream(Filter.create(p, stream.source))
* Skip repeated events, using === to detect duplicates
* @param {Stream} stream stream from which to omit repeated events
* @returns {Stream} stream without repeated events
function skipRepeats (stream) {
return skipRepeatsWith(same, stream)
* Skip repeated events using the provided equals function to detect duplicates
* @param {function(a:*, b:*):boolean} equals optional function to compare items
* @param {Stream} stream stream from which to omit repeated events
* @returns {Stream} stream without repeated events
function skipRepeatsWith (equals, stream) {
return new Stream(new SkipRepeats(equals, stream.source))
function SkipRepeats (equals, source) {
this.equals = equals;
this.source = source;
} = function (sink, scheduler) {
return SkipRepeatsSink(this.equals, sink), scheduler)
function SkipRepeatsSink (equals, sink) {
this.equals = equals;
this.sink = sink;
this.value = void 0;
this.init = true;
SkipRepeatsSink.prototype.end = Pipe.prototype.end;
SkipRepeatsSink.prototype.error = Pipe.prototype.error;
SkipRepeatsSink.prototype.event = function (t, x) {
if (this.init) {
this.init = false;
this.value = x;
this.sink.event(t, x);
} else if (!this.equals(this.value, x)) {
this.value = x;
this.sink.event(t, x);
function same (a, b) {
return a === b
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* @param {number} n
* @param {Stream} stream
* @returns {Stream} new stream containing only up to the first n items from stream
function take (n, stream) {
return slice(0, n, stream)
* @param {number} n
* @param {Stream} stream
* @returns {Stream} new stream with the first n items removed
function skip (n, stream) {
return slice(n, Infinity, stream)
* Slice a stream by index. Negative start/end indexes are not supported
* @param {number} start
* @param {number} end
* @param {Stream} stream
* @returns {Stream} stream containing items where start <= index < end
function slice (start, end, stream) {
return end <= start ? empty$$1()
: new Stream(sliceSource(start, end, stream.source))
function sliceSource (start, end, source) {
return source instanceof Map$1 ? commuteMapSlice(start, end, source)
: source instanceof Slice ? fuseSlice(start, end, source)
: new Slice(start, end, source)
function commuteMapSlice (start, end, source) {
return Map$1.create(source.f, sliceSource(start, end, source.source))
function fuseSlice (start, end, source) {
start += source.min;
end = Math.min(end + source.min, source.max);
return new Slice(start, end, source.source)
function Slice (min, max, source) {
this.source = source;
this.min = min;
this.max = max;
} = function (sink, scheduler) {
return new SliceSink(this.min, this.max - this.min, this.source, sink, scheduler)
function SliceSink (skip, take, source, sink, scheduler) {
this.sink = sink;
this.skip = skip;
this.take = take;
this.disposable = once(, scheduler));
SliceSink.prototype.end = Pipe.prototype.end;
SliceSink.prototype.error = Pipe.prototype.error;
SliceSink.prototype.event = function (t, x) { // eslint-disable-line complexity
if (this.skip > 0) {
this.skip -= 1;
if (this.take === 0) {
this.take -= 1;
this.sink.event(t, x);
if (this.take === 0) {
this.sink.end(t, x);
SliceSink.prototype.dispose = function () {
return this.disposable.dispose()
function takeWhile (p, stream) {
return new Stream(new TakeWhile(p, stream.source))
function TakeWhile (p, source) {
this.p = p;
this.source = source;
} = function (sink, scheduler) {
return new TakeWhileSink(this.p, this.source, sink, scheduler)
function TakeWhileSink (p, source, sink, scheduler) {
this.p = p;
this.sink = sink; = true;
this.disposable = once(, scheduler));
TakeWhileSink.prototype.end = Pipe.prototype.end;
TakeWhileSink.prototype.error = Pipe.prototype.error;
TakeWhileSink.prototype.event = function (t, x) {
if (! {
var p = this.p; = p(x);
if ( {
this.sink.event(t, x);
} else {
this.sink.end(t, x);
TakeWhileSink.prototype.dispose = function () {
return this.disposable.dispose()
function skipWhile (p, stream) {
return new Stream(new SkipWhile(p, stream.source))
function SkipWhile (p, source) {
this.p = p;
this.source = source;
} = function (sink, scheduler) {
return SkipWhileSink(this.p, sink), scheduler)
function SkipWhileSink (p, sink) {
this.p = p;
this.sink = sink;
this.skipping = true;
SkipWhileSink.prototype.end = Pipe.prototype.end;
SkipWhileSink.prototype.error = Pipe.prototype.error;
SkipWhileSink.prototype.event = function (t, x) {
if (this.skipping) {
var p = this.p;
this.skipping = p(x);
if (this.skipping) {
this.sink.event(t, x);
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function takeUntil (signal, stream) {
return new Stream(new Until(signal.source, stream.source))
function skipUntil (signal, stream) {
return new Stream(new Since(signal.source, stream.source))
function during (timeWindow, stream) {
return takeUntil(join(timeWindow), skipUntil(timeWindow, stream))
function Until (maxSignal, source) {
this.maxSignal = maxSignal;
this.source = source;
} = function (sink, scheduler) {
var min = new Bound(-Infinity, sink);
var max = new UpperBound(this.maxSignal, sink, scheduler);
var disposable = TimeWindowSink(min, max, sink), scheduler);
return all$1([min, max, disposable])
function Since (minSignal, source) {
this.minSignal = minSignal;
this.source = source;
} = function (sink, scheduler) {
var min = new LowerBound(this.minSignal, sink, scheduler);
var max = new Bound(Infinity, sink);
var disposable = TimeWindowSink(min, max, sink), scheduler);
return all$1([min, max, disposable])
function Bound (value, sink) {
this.value = value;
this.sink = sink;
Bound.prototype.error = Pipe.prototype.error;
Bound.prototype.event = noop$1$1;
Bound.prototype.end = noop$1$1;
Bound.prototype.dispose = noop$1$1;
function TimeWindowSink (min, max, sink) {
this.min = min;
this.max = max;
this.sink = sink;
TimeWindowSink.prototype.event = function (t, x) {
if (t >= this.min.value && t < this.max.value) {
this.sink.event(t, x);
TimeWindowSink.prototype.error = Pipe.prototype.error;
TimeWindowSink.prototype.end = Pipe.prototype.end;
function LowerBound (signal, sink, scheduler) {
this.value = Infinity;
this.sink = sink;
this.disposable =, scheduler);
LowerBound.prototype.event = function (t /*, x */) {
if (t < this.value) {
this.value = t;
LowerBound.prototype.end = noop$1$1;
LowerBound.prototype.error = Pipe.prototype.error;
LowerBound.prototype.dispose = function () {
return this.disposable.dispose()
function UpperBound (signal, sink, scheduler) {
this.value = Infinity;
this.sink = sink;
this.disposable =, scheduler);
UpperBound.prototype.event = function (t, x) {
if (t < this.value) {
this.value = t;
this.sink.end(t, x);
UpperBound.prototype.end = noop$1$1;
UpperBound.prototype.error = Pipe.prototype.error;
UpperBound.prototype.dispose = function () {
return this.disposable.dispose()
function noop$1$1 () {}
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* @param {Number} delayTime milliseconds to delay each item
* @param {Stream} stream
* @returns {Stream} new stream containing the same items, but delayed by ms
function delay$1 (delayTime, stream) {
return delayTime <= 0 ? stream
: new Stream(new Delay$1(delayTime, stream.source))
function Delay$1 (dt, source) {
this.dt = dt;
this.source = source;
Delay$ = function (sink, scheduler) {
var delaySink = new DelaySink(this.dt, sink, scheduler);
return all$1([delaySink,, scheduler)])
function DelaySink (dt, sink, scheduler) {
this.dt = dt;
this.sink = sink;
this.scheduler = scheduler;
DelaySink.prototype.dispose = function () {
var self = this;
this.scheduler.cancelAll(function (task) {
return task.sink === self.sink
DelaySink.prototype.event = function (t, x) {
this.scheduler.delay(this.dt, PropagateTask.event(x, this.sink));
DelaySink.prototype.end = function (t, x) {
this.scheduler.delay(this.dt, PropagateTask.end(x, this.sink));
DelaySink.prototype.error = Pipe.prototype.error;
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function timestamp (stream) {
return new Stream(new Timestamp(stream.source))
function Timestamp (source) {
this.source = source;
} = function (sink, scheduler) {
return TimestampSink(sink), scheduler)
function TimestampSink (sink) {
this.sink = sink;
TimestampSink.prototype.end = Pipe.prototype.end;
TimestampSink.prototype.error = Pipe.prototype.error;
TimestampSink.prototype.event = function (t, x) {
this.sink.event(t, { time: t, value: x });
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* Limit the rate of events by suppressing events that occur too often
* @param {Number} period time to suppress events
* @param {Stream} stream
* @returns {Stream}
function throttle (period, stream) {
return new Stream(throttleSource(period, stream.source))
function throttleSource (period, source) {
return source instanceof Map$1 ? commuteMapThrottle(period, source)
: source instanceof Throttle ? fuseThrottle(period, source)
: new Throttle(period, source)
function commuteMapThrottle (period, source) {
return Map$1.create(source.f, throttleSource(period, source.source))
function fuseThrottle (period, source) {
return new Throttle(Math.max(period, source.period), source.source)
function Throttle (period, source) {
this.period = period;
this.source = source;
} = function (sink, scheduler) {
return ThrottleSink(this.period, sink), scheduler)
function ThrottleSink (period, sink) {
this.time = 0;
this.period = period;
this.sink = sink;
ThrottleSink.prototype.event = function (t, x) {
if (t >= this.time) {
this.time = t + this.period;
this.sink.event(t, x);
ThrottleSink.prototype.end = Pipe.prototype.end;
ThrottleSink.prototype.error = Pipe.prototype.error;
* Wait for a burst of events to subside and emit only the last event in the burst
* @param {Number} period events occuring more frequently than this
* will be suppressed
* @param {Stream} stream stream to debounce
* @returns {Stream} new debounced stream
function debounce (period, stream) {
return new Stream(new Debounce(period, stream.source))
function Debounce (dt, source) {
this.dt = dt;
this.source = source;
} = function (sink, scheduler) {
return new DebounceSink(this.dt, this.source, sink, scheduler)
function DebounceSink (dt, source, sink, scheduler) {
this.dt = dt;
this.sink = sink;
this.scheduler = scheduler;
this.value = void 0;
this.timer = null;
var sourceDisposable =, scheduler);
this.disposable = all$1([this, sourceDisposable]);
DebounceSink.prototype.event = function (t, x) {
this.value = x;
this.timer = this.scheduler.delay(this.dt, PropagateTask.event(x, this.sink));
DebounceSink.prototype.end = function (t, x) {
if (this._clearTimer()) {
this.sink.event(t, this.value);
this.value = void 0;
this.sink.end(t, x);
DebounceSink.prototype.error = function (t, x) {
this.sink.error(t, x);
DebounceSink.prototype.dispose = function () {
DebounceSink.prototype._clearTimer = function () {
if (this.timer === null) {
return false
this.timer = null;
return true
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* Create a stream containing only the promise's fulfillment
* value at the time it fulfills.
* @param {Promise<T>} p promise
* @return {Stream<T>} stream containing promise's fulfillment value.
* If the promise rejects, the stream will error
function fromPromise (p) {
return awaitPromises(of(p))
* Turn a Stream<Promise<T>> into Stream<T> by awaiting each promise.
* Event order is preserved.
* @param {Stream<Promise<T>>} stream
* @return {Stream<T>} stream of fulfillment values. The stream will
* error if any promise rejects.
function awaitPromises (stream) {
return new Stream(new Await(stream.source))
function Await (source) {
this.source = source;
} = function (sink, scheduler) {
return AwaitSink(sink, scheduler), scheduler)
function AwaitSink (sink, scheduler) {
this.sink = sink;
this.scheduler = scheduler;
this.queue = Promise.resolve();
var self = this;
// Pre-create closures, to avoid creating them per event
this._eventBound = function (x) {
self.sink.event(, x);
this._endBound = function (x) {
self.sink.end(, x);
this._errorBound = function (e) {
self.sink.error(, e);
AwaitSink.prototype.event = function (t, promise) {
var self = this;
this.queue = this.queue.then(function () {
return self._event(promise)
AwaitSink.prototype.end = function (t, x) {
var self = this;
this.queue = this.queue.then(function () {
return self._end(x)
AwaitSink.prototype.error = function (t, e) {
var self = this;
// Don't resolve error values, propagate directly
this.queue = this.queue.then(function () {
return self._errorBound(e)
AwaitSink.prototype._event = function (promise) {
return promise.then(this._eventBound)
AwaitSink.prototype._end = function (x) {
return Promise.resolve(x).then(this._endBound)
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
function SafeSink (sink) {
this.sink = sink; = true;
SafeSink.prototype.event = function (t, x) {
if (! {
this.sink.event(t, x);
SafeSink.prototype.end = function (t, x) {
if (! {
this.sink.end(t, x);
SafeSink.prototype.error = function (t, e) {
this.sink.error(t, e);
SafeSink.prototype.disable = function () { = false;
return this.sink
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
* If stream encounters an error, recover and continue with items from stream
* returned by f.
* @param {function(error:*):Stream} f function which returns a new stream
* @param {Stream} stream
* @returns {Stream} new stream which will recover from an error by calling f
function recoverWith (f, stream) {
return new Stream(new RecoverWith(f, stream.source))
var flatMapError = recoverWith;
* Create a stream containing only an error
* @param {*} e error value, preferably an Error or Error subtype
* @returns {Stream} new stream containing only an error
function throwError (e) {
return new Stream(new ErrorSource(e))
function ErrorSource (e) {
this.value = e;
} = function (sink, scheduler) {
return scheduler.asap(new PropagateTask(runError, this.value, sink))
function runError (t, e, sink) {
sink.error(t, e);
function RecoverWith (f, source) {
this.f = f;
this.source = source;
} = function (sink, scheduler) {
return new RecoverWithSink(this.f, this.source, sink, scheduler)
function RecoverWithSink (f, source, sink, scheduler) {
this.f = f;
this.sink = new SafeSink(sink);
this.scheduler = scheduler;
this.disposable =, scheduler);
RecoverWithSink.prototype.event = function (t, x) {
tryEvent(t, x, this.sink);
RecoverWithSink.prototype.end = function (t, x) {
tryEnd(t, x, this.sink);
RecoverWithSink.prototype.error = function (t, e) {
var nextSink = this.sink.disable();
tryDispose(t, this.disposable, this.sink);
this._startNext(t, e, nextSink);
RecoverWithSink.prototype._startNext = function (t, x, sink) {
try {
this.disposable = this._continue(this.f, x, sink);
} catch (e) {
sink.error(t, e);
RecoverWithSink.prototype._continue = function (f, x, sink) {
var stream = f(x);
return, this.scheduler)
RecoverWithSink.prototype.dispose = function () {
return this.disposable.dispose()
var MulticastDisposable = function MulticastDisposable (source, sink) {
this.source = source;
this.sink = sink;
this.disposed = false;
MulticastDisposable.prototype.dispose = function dispose () {
if (this.disposed) {
this.disposed = true;
var remaining = this.source.remove(this.sink);
return remaining === 0 && this.source._dispose()
function tryEvent$1 (t, x, sink) {
try {
sink.event(t, x);
} catch (e) {
sink.error(t, e);
function tryEnd$1 (t, x, sink) {
try {
sink.end(t, x);
} catch (e) {
sink.error(t, e);
var dispose = function (disposable) { return disposable.dispose(); };
var emptyDisposable = {
dispose: function dispose$1 () {}
var MulticastSource = function MulticastSource (source) {
this.source = source;
this.sinks = [];
this._disposable = emptyDisposable;
}; = function run (sink, scheduler) {
var n = this.add(sink);
if (n === 1) {
this._disposable =, scheduler);
return new MulticastDisposable(this, sink)
MulticastSource.prototype._dispose = function _dispose () {
var disposable = this._disposable;
this._disposable = emptyDisposable;
return Promise.resolve(disposable).then(dispose)
MulticastSource.prototype.add = function add (sink) {
this.sinks = append(sink, this.sinks);
return this.sinks.length
MulticastSource.prototype.remove = function remove$1 (sink) {
var i = findIndex(sink, this.sinks);
// istanbul ignore next
if (i >= 0) {
this.sinks = remove(i, this.sinks);
return this.sinks.length
MulticastSource.prototype.event = function event (time, value) {
var s = this.sinks;
if (s.length === 1) {
return s[0].event(time, value)
for (var i = 0; i < s.length; ++i) {
tryEvent$1(time, value, s[i]);
MulticastSource.prototype.end = function end (time, value) {
var s = this.sinks;
for (var i = 0; i < s.length; ++i) {
tryEnd$1(time, value, s[i]);
MulticastSource.prototype.error = function error (time, err) {
var s = this.sinks;
for (var i = 0; i < s.length; ++i) {
s[i].error(time, err);
function multicast (stream) {
var source = stream.source;
return source instanceof MulticastSource
? stream
: new stream.constructor(new MulticastSource(source))
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */
// Add of and empty to constructor for fantasy-land compat
Stream.of = of;
Stream.empty = empty$$1;
// Add from to constructor for ES Observable compat
Stream.from = from;
// -----------------------------------------------------------------------
// Draft ES Observable proposal interop
Stream.prototype.subscribe = function (subscriber) {
return subscribe(subscriber, this)
Stream.prototype[index$1] = function () {
return this
// -----------------------------------------------------------------------
// Fluent adapter
* Adapt a functional stream transform to fluent style.
* It applies f to the this stream object
* @param {function(s: Stream): Stream} f function that
* receives the stream itself and must return a new stream
* @return {Stream}
Stream.prototype.thru = function (f) {
return thru(f, this)
// -----------------------------------------------------------------------
// Observing
* Process all the events in the stream
* @returns {Promise} promise that fulfills when the stream ends, or rejects
* if the stream fails with an unhandled error.
Stream.prototype.observe = Stream.prototype.forEach = function (f) {
return observe(f, this)
* Consume all events in the stream, without providing a function to process each.
* This causes a stream to become active and begin emitting events, and is useful
* in cases where all processing has been setup upstream via other combinators, and
* there is no need to process the terminal events.
* @returns {Promise} promise that fulfills when the stream ends, or rejects
* if the stream fails with an unhandled error.
Stream.prototype.drain = function () {
return drain(this)
// -------------------------------------------------------
* Generalized feedback loop. Call a stepper function for each event. The stepper
* will be called with 2 params: the current seed and the an event value. It must
* return a new { seed, value } pair. The `seed` will be fed back into the next
* invocation of stepper, and the `value` will be propagated as the event value.
* @param {function(seed:*, value:*):{seed:*, value:*}} stepper loop step function
* @param {*} seed initial seed value passed to first stepper call
* @returns {Stream} new stream whose values are the `value` field of the objects
* returned by the stepper
Stream.prototype.loop = function (stepper, seed) {
return loop(stepper, seed, this)
// -------------------------------------------------------
* Create a stream containing successive reduce results of applying f to
* the previous reduce result and the current stream item.
* @param {function(result:*, x:*):*} f reducer function
* @param {*} initial initial value
* @returns {Stream} new stream containing successive reduce results
Stream.prototype.scan = function (f, initial) {
return scan(f, initial, this)
* Reduce the stream to produce a single result. Note that reducing an infinite
* stream will return a Promise that never fulfills, but that may reject if an error
* occurs.
* @param {function(result:*, x:*):*} f reducer function
* @param {*} initial optional initial value
* @returns {Promise} promise for the file result of the reduce
Stream.prototype.reduce = function (f, initial) {
return reduce$1(f, initial, this)
* @param {Stream} tail
* @returns {Stream} new stream containing all items in this followed by
* all items in tail
Stream.prototype.concat = function (tail$$1) {
return concat(this, tail$$1)
* @param {*} x value to prepend
* @returns {Stream} a new stream with x prepended
Stream.prototype.startWith = function (x) {
return cons$1(x, this)
// -----------------------------------------------------------------------
// Transforming
* Transform each value in the stream by applying f to each
* @param {function(*):*} f mapping function
* @returns {Stream} stream containing items transformed by f
*/ = function (f) {
return map$3(f, this)
* Assume this stream contains functions, and apply each function to each item
* in the provided stream. This generates, in effect, a cross product.
* @param {Stream} xs stream of items to which
* @returns {Stream} stream containing the cross product of items
Stream.prototype.ap = function (xs) {
return ap(this, xs)
* Replace each value in the stream with x
* @param {*} x
* @returns {Stream} stream containing items replaced with x
Stream.prototype.constant = function (x) {
return constant(x, this)
* Perform a side effect for each item in the stream
* @param {function(x:*):*} f side effect to execute for each item. The
* return value will be discarded.
* @returns {Stream} new stream containing the same items as this stream
Stream.prototype.tap = function (f) {
return tap(f, this)
// -----------------------------------------------------------------------
// Transducer support
* Transform this stream by passing its events through a transducer.
* @param {function} transducer transducer function
* @return {Stream} stream of events transformed by the transducer
Stream.prototype.transduce = function (transducer) {
return transduce(transducer, this)
// -----------------------------------------------------------------------
// FlatMapping
* Map each value in the stream to a new stream, and merge it into the
* returned outer stream. Event arrival times are preserved.
* @param {function(x:*):Stream} f chaining function, must return a Stream
* @returns {Stream} new stream containing all events from each stream returned by f
Stream.prototype.chain = function (f) {
return flatMap(f, this)
// @deprecated use chain instead
Stream.prototype.flatMap = Stream.prototype.chain;
* Monadic join. Flatten a Stream<Stream<X>> to Stream<X> by merging inner
* streams to the outer. Event arrival times are preserved.
* @returns {Stream<X>} new stream containing all events of all inner streams
Stream.prototype.join = function () {
return join(this)
* Map the end event to a new stream, and begin emitting its values.
* @param {function(x:*):Stream} f function that receives the end event value,
* and *must* return a new Stream to continue with.
* @returns {Stream} new stream that emits all events from the original stream,
* followed by all events from the stream returned by f.
Stream.prototype.continueWith = function (f) {
return continueWith(f, this)
// @deprecated use continueWith instead
Stream.prototype.flatMapEnd = Stream.prototype.continueWith;
Stream.prototype.concatMap = function (f) {
return concatMap(f, this)
// -----------------------------------------------------------------------
// Concurrent merging
* Flatten a Stream<Stream<X>> to Stream<X> by merging inner
* streams to the outer, limiting the number of inner streams that may
* be active concurrently.
* @param {number} concurrency at most this many inner streams will be
* allowed to be active concurrently.
* @return {Stream<X>} new stream containing all events of all inner
* streams, with limited concurrency.
Stream.prototype.mergeConcurrently = function (concurrency) {
return mergeConcurrently(concurrency, this)
// -----------------------------------------------------------------------
// Merging
* Merge this stream and all the provided streams
* @returns {Stream} stream containing items from this stream and s in time
* order. If two events are simultaneous they will be merged in
* arbitrary order.
Stream.prototype.merge = function (/* ...streams*/) {
return mergeArray(cons(this, arguments))
// -----------------------------------------------------------------------
// Combining
* Combine latest events from all input streams
* @param {function(*} f function to combine most recent events
* @returns {Stream} stream containing the result of applying f to the most recent
* event of each input stream, whenever a new event arrives on any stream.
Stream.prototype.combine = function (f /*, ...streams*/) {
return combineArray(f, replace(this, 0, arguments))
// -----------------------------------------------------------------------
// Sampling
* When an event arrives on sampler, emit the latest event value from stream.
* @param {Stream} sampler stream of events at whose arrival time
* signal's latest value will be propagated
* @returns {Stream} sampled stream of values
Stream.prototype.sampleWith = function (sampler) {
return sampleWith(sampler, this)
* When an event arrives on this stream, emit the result of calling f with the latest
* values of all streams being sampled
* @param {function(...values):*} f function to apply to each set of sampled values
* @returns {Stream} stream of sampled and transformed values
Stream.prototype.sample = function (f /* ...streams */) {
return sampleArray(f, this, tail(arguments))
// -----------------------------------------------------------------------
// Zipping
* Pair-wise combine items with those in s. Given 2 streams:
* [1,2,3] zipWith f [4,5,6] -> [f(1,4),f(2,5),f(3,6)]
* Note: zip causes fast streams to buffer and wait for slow streams.
* @param {function(a:Stream, b:Stream, ...):*} f function to combine items
* @returns {Stream} new stream containing pairs
*/ = function (f /*, ...streams*/) {
return zipArray(f, replace(this, 0, arguments))
// -----------------------------------------------------------------------
// Switching
* Given a stream of streams, return a new stream that adopts the behavior
* of the most recent inner stream.
* @returns {Stream} switching stream
Stream.prototype.switchLatest = function () {
return switchLatest(this)
// @deprecated use switchLatest instead
Stream.prototype.switch = Stream.prototype.switchLatest;
// -----------------------------------------------------------------------
// Filtering
* Retain only items matching a predicate
* stream: -12345678-
* filter(x => x % 2 === 0, stream): --2-4-6-8-
* @param {function(x:*):boolean} p filtering predicate called for each item
* @returns {Stream} stream containing only items for which predicate returns truthy
Stream.prototype.filter = function (p) {
return filter(p, this)
* Skip repeated events, using === to compare items
* stream: -abbcd-
* distinct(stream): -ab-cd-
* @returns {Stream} stream with no repeated events
Stream.prototype.skipRepeats = function () {
return skipRepeats(this)
* Skip repeated events, using supplied equals function to compare items
* @param {function(a:*, b:*):boolean} equals function to compare items
* @returns {Stream} stream with no repeated events
Stream.prototype.skipRepeatsWith = function (equals) {
return skipRepeatsWith(equals, this)
// -----------------------------------------------------------------------
// Slicing
* stream: -abcd-
* take(2, stream): -ab|
* @param {Number} n take up to this many events
* @returns {Stream} stream containing at most the first n items from this stream
Stream.prototype.take = function (n) {
return take(n, this)
* stream: -abcd->
* skip(2, stream): ---cd->
* @param {Number} n skip this many events
* @returns {Stream} stream not containing the first n events
Stream.prototype.skip = function (n) {
return skip(n, this)
* Slice a stream by event index. Equivalent to, but more efficient than
* stream.take(end).skip(start);
* NOTE: Negative start and end are not supported
* @param {Number} start skip all events before the start index
* @param {Number} end allow all events from the start index to the end index
* @returns {Stream} stream containing items where start <= index < end
Stream.prototype.slice = function (start, end) {
return slice(start, end, this)
* stream: -123451234->
* takeWhile(x => x < 5, stream): -1234|
* @param {function(x:*):boolean} p predicate
* @returns {Stream} stream containing items up to, but not including, the
* first item for which p returns falsy.
Stream.prototype.takeWhile = function (p) {
return takeWhile(p, this)
* stream: -123451234->
* skipWhile(x => x < 5, stream): -----51234->
* @param {function(x:*):boolean} p predicate
* @returns {Stream} stream containing items following *and including* the
* first item for which p returns falsy.
Stream.prototype.skipWhile = function (p) {
return skipWhile(p, this)
// -----------------------------------------------------------------------
// Time slicing
* stream: -a-b-c-d-e-f-g->
* signal: -------x
* takeUntil(signal, stream): -a-b-c-|
* @param {Stream} signal retain only events in stream before the first
* event in signal
* @returns {Stream} new stream containing only events that occur before
* the first event in signal.
Stream.prototype.until = function (signal) {
return takeUntil(signal, this)
// @deprecated use until instead
Stream.prototype.takeUntil = Stream.prototype.until;
* stream: -a-b-c-d-e-f-g->
* signal: -------x
* takeUntil(signal, stream): -------d-e-f-g->
* @param {Stream} signal retain only events in stream at or after the first
* event in signal
* @returns {Stream} new stream containing only events that occur after
* the first event in signal.
Stream.prototype.since = function (signal) {
return skipUntil(signal, this)
// @deprecated use since instead
Stream.prototype.skipUntil = Stream.prototype.since;
* stream: -a-b-c-d-e-f-g->
* timeWindow: -----s
* s: -----t
* stream.during(timeWindow): -----c-d-e-|
* @param {Stream<Stream>} timeWindow a stream whose first event (s) represents
* the window start time. That event (s) is itself a stream whose first event (t)
* represents the window end time
* @returns {Stream} new stream containing only events within the provided timespan
Stream.prototype.during = function (timeWindow) {
return during(timeWindow, this)
// -----------------------------------------------------------------------
// Delaying
* @param {Number} delayTime milliseconds to delay each item
* @returns {Stream} new stream containing the same items, but delayed by ms
Stream.prototype.delay = function (delayTime) {
return delay$1(delayTime, this)
// -----------------------------------------------------------------------
// Getting event timestamp
* Expose event timestamps into the stream. Turns a Stream<X> into
* Stream<{time:t, value:X}>
* @returns {Stream<{time:number, value:*}>}
Stream.prototype.timestamp = function () {
return timestamp(this)
// -----------------------------------------------------------------------
// Rate limiting
* Limit the rate of events
* stream: abcd----abcd----
* throttle(2, stream): a-c-----a-c-----
* @param {Number} period time to suppress events
* @returns {Stream} new stream that skips events for throttle period
Stream.prototype.throttle = function (period) {
return throttle(period, this)
* Wait for a burst of events to subside and emit only the last event in the burst
* stream: abcd----abcd----
* debounce(2, stream): -----d-------d--
* @param {Number} period events occuring more frequently than this
* on the provided scheduler will be suppressed
* @returns {Stream} new debounced stream
Stream.prototype.debounce = function (period) {
return debounce(period, this)
// -----------------------------------------------------------------------
// Awaiting Promises
* Await promises, turning a Stream<Promise<X>> into Stream<X>. Preserves
* event order, but timeshifts events based on promise resolution time.
* @returns {Stream<X>} stream containing non-promise values
Stream.prototype.awaitPromises = function () {
return awaitPromises(this)
// @deprecated use awaitPromises instead
Stream.prototype.await = Stream.prototype.awaitPromises;
// -----------------------------------------------------------------------
// Error handling
* If this stream encounters an error, recover and continue with items from stream
* returned by f.
* stream: -a-b-c-X-
* f(X): d-e-f-g-
* flatMapError(f, stream): -a-b-c-d-e-f-g-
* @param {function(error:*):Stream} f function which returns a new stream
* @returns {Stream} new stream which will recover from an error by calling f
Stream.prototype.recoverWith = function (f) {
return flatMapError(f, this)
// @deprecated use recoverWith instead
Stream.prototype.flatMapError = Stream.prototype.recoverWith;
// -----------------------------------------------------------------------
// Multicasting
* Transform the stream into multicast stream. That means that many subscribers
* to the stream will not cause multiple invocations of the internal machinery.
* @returns {Stream} new stream which will multicast events to all observers.
Stream.prototype.multicast = function () {
return multicast(this)
// export the instance of the defaultScheduler for third-party libraries
// export an implementation of Task used internally for third-party libraries
exports.Stream = Stream;
exports.of = of;
exports.just = of;
exports.empty = empty$$1;
exports.never = never$1;
exports.from = from;
exports.periodic = periodic;
exports.observe = observe;
exports.forEach = observe;
exports.drain = drain;
exports.loop = loop;
exports.scan = scan;
exports.reduce = reduce$1;
exports.concat = concat;
exports.startWith = cons$1; = map$3;
exports.constant = constant;
exports.tap = tap;
exports.ap = ap;
exports.transduce = transduce;
exports.flatMap = flatMap;
exports.chain = flatMap;
exports.join = join;
exports.continueWith = continueWith;
exports.flatMapEnd = continueWith;
exports.concatMap = concatMap;
exports.mergeConcurrently = mergeConcurrently;
exports.merge = merge$1;
exports.mergeArray = mergeArray;
exports.combine = combine;
exports.combineArray = combineArray;
exports.sample = sample;
exports.sampleArray = sampleArray;
exports.sampleWith = sampleWith; = zip;
exports.zipArray = zipArray;
exports.switchLatest = switchLatest;
exports.switch = switchLatest;
exports.filter = filter;
exports.skipRepeats = skipRepeats;
exports.distinct = skipRepeats;
exports.skipRepeatsWith = skipRepeatsWith;
exports.distinctBy = skipRepeatsWith;
exports.take = take;
exports.skip = skip;
exports.slice = slice;
exports.takeWhile = takeWhile;
exports.skipWhile = skipWhile;
exports.takeUntil = takeUntil;
exports.until = takeUntil;
exports.skipUntil = skipUntil;
exports.since = skipUntil;
exports.during = during;
exports.delay = delay$1;
exports.timestamp = timestamp;
exports.throttle = throttle;
exports.debounce = debounce;
exports.fromPromise = fromPromise;
exports.awaitPromises = awaitPromises;
exports.await = awaitPromises;
exports.recoverWith = recoverWith;
exports.flatMapError = flatMapError;
exports.throwError = throwError;
exports.multicast = multicast;
exports.defaultScheduler = defaultScheduler;
exports.PropagateTask = PropagateTask;
exports.fromEvent = fromEvent;
exports.unfold = unfold;
exports.iterate = iterate;
exports.generate = generate;
Object.defineProperty(exports, '__esModule', { value: true });
var Promise = CreedPromise; // provide a real local Promise impl
"name": "most-with-creed",
"version": "1.0.0",
"description": "Create a most.js module with Creed's blazingly fast promise shim.",
"main": "index.js",
"scripts": {
"build": "rollup -c",
"test": "echo \"Error: no test specified\" && exit 1"
"repository": {
"type": "git",
"url": "git+ssh://[email protected]/8c36447483eef5d6f6ce2d96096d3f92.git"
"author": "[email protected]",
"license": "MIT",
"bugs": {
"url": ""
"homepage": "",
"dependencies": {
"creed": "^1.2.1",
"most": "^1.2.2"
"devDependencies": {
"rollup": "^0.41.6",
"rollup-plugin-commonjs": "^8.0.2",
"rollup-plugin-node-resolve": "^3.0.0"
import nodeResolve from 'rollup-plugin-node-resolve'
import commonjs from 'rollup-plugin-commonjs'
export default {
entry: 'index.js',
dest: 'most-with-creed.js',
format: 'umd', // can be 'amd', 'cjs', 'umd', 'iife', etc.
intro: 'var Promise = function () {}; // create a phony local promise impl',
outro: 'var Promise = CreedPromise; // provide a real local Promise impl',
moduleName: 'most', // this is needed for umd or iife
plugins: [
nodeResolve({ preferBuiltins: true }),
