Skip to content

Instantly share code, notes, and snippets.

@softwarespot
Last active November 4, 2017 06:50
Show Gist options
  • Select an option

  • Save softwarespot/72e0dfc0ff59be99063d4256be97869e to your computer and use it in GitHub Desktop.

Select an option

Save softwarespot/72e0dfc0ff59be99063d4256be97869e to your computer and use it in GitHub Desktop.
Observable implementation aka Feed
function Feed(subscriber) {
if (!(this instanceof Feed)) {
throw new TypeError('"Feed" cannot be called as a function. Use the "new" keyword to construct a new "Feed" object');
}
if (typeof subscriber !== 'function') {
throw new TypeError('Invalid argument, expected "subscriber" to be a function data type');
}
this._subscriber = subscriber
}
Feed.prototype.subscribe = function (onNext, onError, onComplete) {
// The first argument can either be an Observer interface or an "onNext" function
var observer = typeof onNext !== 'function' && Object(onNext) === onNext ? onNext : {
next: onNext,
error: onError,
complete: onComplete
}
// SubscriptionObserver interface
var subscriptionObserver = {};
Object.defineProperties(subscriptionObserver, {
closed: {
get: Object.hasOwnProperty.bind(subscriptionObserver, '_hasClosed')
}
});
['unsubscribe', 'next', 'error', 'complete']
.forEach(function (method) {
Object.defineProperty(subscriptionObserver, method, {
value: _tryMethod.bind(subscriptionObserver, observer, method)
});
});
// Subscription interface
var subscription = Object.defineProperties({}, {
closed: {
get: Object.hasOwnProperty.bind(subscriptionObserver, '_hasClosed')
},
unsubscribe: {
value: subscriptionObserver.unsubscribe
}
});
_tryMethod.call(subscriptionObserver, observer, 'start', subscriptionObserver);
if (!subscriptionObserver.closed) {
try {
subscriptionObserver._cleanup = this._subscriber(subscriptionObserver);
// The "cleanup" function is optional, but if it's defined and not a function data type, then throw a TypeError
if (subscriptionObserver._cleanup && typeof subscriptionObserver._cleanup !== 'function') {
throw new TypeError('Invalid argument, expected "cleanup" to be a function data type');
}
} catch (ex) {
subscriptionObserver.error(ex);
}
}
return subscription;
};
// "this" is bound to the subscription observer object
function _tryMethod(observer, method, value) {
if (!this.closed) {
try {
if (method !== 'unsubscribe') {
observer[method].apply(observer, method !== 'complete' ? [value] : []);
} else {
// Ensure the "_hasClosed" property cannot be mutated anymore
Object.defineProperty(this, '_hasClosed', {});
this._cleanup();
}
} catch (ex) {
// Ignore
}
if (method === 'error' || method === 'complete') {
this.unsubscribe();
} else if (method === 'unsubscribe') {
// Ensure the "_cleanup" property cannot be mutated anymore
Object.defineProperty(this, '_cleanup', {});
}
}
}
// Feed Example(s)
// Toggle the specific example
var example = 3;
switch (example) {
case 1:
// Basic example
var subscription = new Feed(function (observer) {
var id = 0;
var timeout = setInterval(function () {
observer.next(id);
id += 1;
if (id === 10) {
observer.complete();
}
}, 5);
return function () {
console.log('unsubscribe::');
clearTimeout(timeout);
};
})
.subscribe({
id: 5,
start: function () {
console.log('start::', this.id);
},
next: function (value) {
console.log('next::', value, this.id);
// Demonstrates "this" is bound to the object passed to the subscribe function
this.id += 1;
},
error: function (err) {
console.error('error::', err);
},
complete: function () {
console.log('complete::');
}
});
setTimeout(function () {
console.log('closed::', subscription.closed);
subscription.unsubscribe();
console.log('closed::', subscription.closed);
}, 40);
break;
case 2:
// Map operator example
Feed.prototype.map = function (fn) {
if (typeof fn !== 'function') {
throw new TypeError('Invalid argument, expected "fn" to be a function data type');
}
var previous = this;
return new this.constructor(function (observer) {
return previous.subscribe({
next: function (value) {
try {
observer.next(fn(value));
} catch (ex) {
observer.error(ex);
}
},
error: observer.error,
complete: observer.complete
})
.unsubscribe;
});
};
new Feed(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.next(4);
observer.next(5);
observer.complete();
observer.next(6);
})
.map(function (value) {
return value * 2;
})
.subscribe({
next: console.log.bind(console, 'next::'),
error: console.log.bind(console, 'error::'),
complete: console.log.bind(console, 'complete::')
});
break;
case 3:
// Of example
Feed.of = function () {
var args = Array.from(arguments);
return new this(function (observer) {
// Iterate over the array until "closed" becomes true. This is a workaround for "breaking" iteration of the collection
args.some(function (value) {
return observer.closed || observer.next(value);
});
observer.complete();
});
};
var subscription = Feed.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.subscribe({
next: console.log.bind(console, 'next::'),
error: console.log.bind(console, 'error::'),
complete: console.log.bind(console, 'complete::')
});
break;
case 4:
// forEach operator example
Feed.prototype.forEach = function (fn, promiseCtor) {
if (typeof fn !== 'function') {
throw new TypeError('Invalid argument, expected "fn" to be a function data type');
}
var previous = this;
return new (promiseCtor || Promise)(function (resolve, reject) {
previous.subscribe({
start: function (subscription) {
this._subscription = subscription;
},
next: function (value) {
try {
return !this._subscription.closed && fn(value);
} catch (ex) {
reject(ex);
this._subscription.unsubscribe();
}
},
error: reject,
complete: resolve
});
});
};
new Feed(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.next(4);
observer.next(5);
observer.complete();
observer.next(6);
})
.forEach(function (value) {
console.log('next::', value);
})
.then(function () {
console.log('complete::');
})
.catch(function (err) {
console.log('error::', err);
});
break;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment