Last active
November 4, 2017 06:50
-
-
Save softwarespot/72e0dfc0ff59be99063d4256be97869e to your computer and use it in GitHub Desktop.
Observable implementation aka Feed
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| function Feed(subscriber) { | |
| if (!(this instanceof Feed)) { | |
| throw new TypeError('"Feed" cannot be called as a function. Use the "new" keyword to construct a new "Feed" object'); | |
| } | |
| if (typeof subscriber !== 'function') { | |
| throw new TypeError('Invalid argument, expected "subscriber" to be a function data type'); | |
| } | |
| this._subscriber = subscriber | |
| } | |
| Feed.prototype.subscribe = function (onNext, onError, onComplete) { | |
| // The first argument can either be an Observer interface or an "onNext" function | |
| var observer = typeof onNext !== 'function' && Object(onNext) === onNext ? onNext : { | |
| next: onNext, | |
| error: onError, | |
| complete: onComplete | |
| } | |
| // SubscriptionObserver interface | |
| var subscriptionObserver = {}; | |
| Object.defineProperties(subscriptionObserver, { | |
| closed: { | |
| get: Object.hasOwnProperty.bind(subscriptionObserver, '_hasClosed') | |
| } | |
| }); | |
| ['unsubscribe', 'next', 'error', 'complete'] | |
| .forEach(function (method) { | |
| Object.defineProperty(subscriptionObserver, method, { | |
| value: _tryMethod.bind(subscriptionObserver, observer, method) | |
| }); | |
| }); | |
| // Subscription interface | |
| var subscription = Object.defineProperties({}, { | |
| closed: { | |
| get: Object.hasOwnProperty.bind(subscriptionObserver, '_hasClosed') | |
| }, | |
| unsubscribe: { | |
| value: subscriptionObserver.unsubscribe | |
| } | |
| }); | |
| _tryMethod.call(subscriptionObserver, observer, 'start', subscriptionObserver); | |
| if (!subscriptionObserver.closed) { | |
| try { | |
| subscriptionObserver._cleanup = this._subscriber(subscriptionObserver); | |
| // The "cleanup" function is optional, but if it's defined and not a function data type, then throw a TypeError | |
| if (subscriptionObserver._cleanup && typeof subscriptionObserver._cleanup !== 'function') { | |
| throw new TypeError('Invalid argument, expected "cleanup" to be a function data type'); | |
| } | |
| } catch (ex) { | |
| subscriptionObserver.error(ex); | |
| } | |
| } | |
| return subscription; | |
| }; | |
| // "this" is bound to the subscription observer object | |
| function _tryMethod(observer, method, value) { | |
| if (!this.closed) { | |
| try { | |
| if (method !== 'unsubscribe') { | |
| observer[method].apply(observer, method !== 'complete' ? [value] : []); | |
| } else { | |
| // Ensure the "_hasClosed" property cannot be mutated anymore | |
| Object.defineProperty(this, '_hasClosed', {}); | |
| this._cleanup(); | |
| } | |
| } catch (ex) { | |
| // Ignore | |
| } | |
| if (method === 'error' || method === 'complete') { | |
| this.unsubscribe(); | |
| } else if (method === 'unsubscribe') { | |
| // Ensure the "_cleanup" property cannot be mutated anymore | |
| Object.defineProperty(this, '_cleanup', {}); | |
| } | |
| } | |
| } | |
| // Feed Example(s) | |
| // Toggle the specific example | |
| var example = 3; | |
| switch (example) { | |
| case 1: | |
| // Basic example | |
| var subscription = new Feed(function (observer) { | |
| var id = 0; | |
| var timeout = setInterval(function () { | |
| observer.next(id); | |
| id += 1; | |
| if (id === 10) { | |
| observer.complete(); | |
| } | |
| }, 5); | |
| return function () { | |
| console.log('unsubscribe::'); | |
| clearTimeout(timeout); | |
| }; | |
| }) | |
| .subscribe({ | |
| id: 5, | |
| start: function () { | |
| console.log('start::', this.id); | |
| }, | |
| next: function (value) { | |
| console.log('next::', value, this.id); | |
| // Demonstrates "this" is bound to the object passed to the subscribe function | |
| this.id += 1; | |
| }, | |
| error: function (err) { | |
| console.error('error::', err); | |
| }, | |
| complete: function () { | |
| console.log('complete::'); | |
| } | |
| }); | |
| setTimeout(function () { | |
| console.log('closed::', subscription.closed); | |
| subscription.unsubscribe(); | |
| console.log('closed::', subscription.closed); | |
| }, 40); | |
| break; | |
| case 2: | |
| // Map operator example | |
| Feed.prototype.map = function (fn) { | |
| if (typeof fn !== 'function') { | |
| throw new TypeError('Invalid argument, expected "fn" to be a function data type'); | |
| } | |
| var previous = this; | |
| return new this.constructor(function (observer) { | |
| return previous.subscribe({ | |
| next: function (value) { | |
| try { | |
| observer.next(fn(value)); | |
| } catch (ex) { | |
| observer.error(ex); | |
| } | |
| }, | |
| error: observer.error, | |
| complete: observer.complete | |
| }) | |
| .unsubscribe; | |
| }); | |
| }; | |
| new Feed(function (observer) { | |
| observer.next(1); | |
| observer.next(2); | |
| observer.next(3); | |
| observer.next(4); | |
| observer.next(5); | |
| observer.complete(); | |
| observer.next(6); | |
| }) | |
| .map(function (value) { | |
| return value * 2; | |
| }) | |
| .subscribe({ | |
| next: console.log.bind(console, 'next::'), | |
| error: console.log.bind(console, 'error::'), | |
| complete: console.log.bind(console, 'complete::') | |
| }); | |
| break; | |
| case 3: | |
| // Of example | |
| Feed.of = function () { | |
| var args = Array.from(arguments); | |
| return new this(function (observer) { | |
| // Iterate over the array until "closed" becomes true. This is a workaround for "breaking" iteration of the collection | |
| args.some(function (value) { | |
| return observer.closed || observer.next(value); | |
| }); | |
| observer.complete(); | |
| }); | |
| }; | |
| var subscription = Feed.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) | |
| .subscribe({ | |
| next: console.log.bind(console, 'next::'), | |
| error: console.log.bind(console, 'error::'), | |
| complete: console.log.bind(console, 'complete::') | |
| }); | |
| break; | |
| case 4: | |
| // forEach operator example | |
| Feed.prototype.forEach = function (fn, promiseCtor) { | |
| if (typeof fn !== 'function') { | |
| throw new TypeError('Invalid argument, expected "fn" to be a function data type'); | |
| } | |
| var previous = this; | |
| return new (promiseCtor || Promise)(function (resolve, reject) { | |
| previous.subscribe({ | |
| start: function (subscription) { | |
| this._subscription = subscription; | |
| }, | |
| next: function (value) { | |
| try { | |
| return !this._subscription.closed && fn(value); | |
| } catch (ex) { | |
| reject(ex); | |
| this._subscription.unsubscribe(); | |
| } | |
| }, | |
| error: reject, | |
| complete: resolve | |
| }); | |
| }); | |
| }; | |
| new Feed(function (observer) { | |
| observer.next(1); | |
| observer.next(2); | |
| observer.next(3); | |
| observer.next(4); | |
| observer.next(5); | |
| observer.complete(); | |
| observer.next(6); | |
| }) | |
| .forEach(function (value) { | |
| console.log('next::', value); | |
| }) | |
| .then(function () { | |
| console.log('complete::'); | |
| }) | |
| .catch(function (err) { | |
| console.log('error::', err); | |
| }); | |
| break; | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment