Skip to content

Instantly share code, notes, and snippets.

@jdelight
Last active September 30, 2015 11:55
Show Gist options
  • Save jdelight/24dc75a2982e1972a501 to your computer and use it in GitHub Desktop.
Save jdelight/24dc75a2982e1972a501 to your computer and use it in GitHub Desktop.
Fake observable based on rxJS API.
/*
Takes an array of items with each "," representing 100ms.
Subscribers can map or filter over the Observable.
Only when forEach is called do they start
getting data (with the transformations applied) over time.
Usage:
var subscriber = FakeObservable([10, , , 3, , , 8, , , , , 2, 18, 9]).subscribe();
subscriber.filter(function (item) {
return item > 5 && item < 10;
}).map(function (item) {
return item * 100;
});
// subscriber.forEach(callback, onErrorCallback, onCompleteCallback);
subscriber.forEach(function (item) {
console.log(item);
}, function (error) {
console.log('Error', error);
}, function () {
console.log('I\'m done!');
});
// Outputs:
** this will run till completion **
.
.
.
.
800
.
.
.
.
900
I'm done
*/
function FakeObservable(itterable) {
function Emitter(itterable) {
var self = this;
var counter = 0;
var total = itterable.length;
var forever = [].concat(itterable).splice(-2).join("") === "";
(forever) ? console.warn("** this will run forever **") : console.info("** this will run till completion **");
self.emitter = undefined;
self.done = undefined;
function getNext() {
return itterable[counter++];
}
function next() {
var item = getNext();
if (counter > total) {
if (!forever) {
return self.done();
}
}
if (item) {
self.emitter(item);
next();
} else {
console.log('.');
setTimeout(function () {
next()
}, 100);
}
}
this.subscribe = function () {
return new Subscriber(self);
};
this.start = function () {
next();
};
}
function Subscriber(obs) {
var self = this;
var fns = [];
var numEvents = 0;
function reducer(callback, item) {
var result = fns.reduce(function (acc, next) {
return next(acc);
}, item);
(result) ? callback(result) : null;
}
function register(callback) {
fns.push(callback);
return self;
}
function createReducer(callback) {
return function(item) {
numEvents++;
return reducer(callback, item);
};
}
self.forEach = function (callback, onError, onComplete) {
if (!obs.emitter) {
self.dispose = function () {
obs.done();
obs.emitter = function(){};
obs.error = function(){};
obs.done = function(){};
};
obs.emitter = createReducer(callback);
obs.error = onError;
obs.done = onComplete;
obs.start();
}
};
self.map = function (callback) {
return register(function map(item) {
if (Array.isArray(item)) {
return item.map(callback);
}
// todo handle objects
return callback(item);
});
};
self.filter = function (callback) {
return register(function filter(item) {
if (callback(item)) {
return item;
}
});
};
self.take = function (limit) {
return self.takeUntil(function() {
return numEvents > limit;
});
};
self.takeUntil = function (callback) {
return register(function takeUntil(item) {
if (callback(item)) {
return self.dispose();
}
return item;
});
};
}
return new Emitter(itterable);
}
// Each , represents 100ms
//var observable = FakeObservable([[1, 2, 3], , , , , , [4, 5, 6], , , , , , , , , , , [1, 2]]);
//var subscriber = observable.subscribe();
//
//subscriber.map(function (item) {
// return item + 10;
//}).filter(function (item) {
// return item.indexOf(1) > -1 || item.indexOf(11) > -1;
//}).map(function (item) {
// return item - 10;
//});
//subscriber.forEach(function (item) {
// console.log(item);
//}, function (error) {
// console.log('Error', error);
//}, function () {
// console.log('I\'m done!');
//});
/*
** this will run till completion **
[ 1, 2, 3 ]
.
.
.
.
.
undefined
.
.
.
.
.
.
.
.
.
.
[ 1, 2 ]
I'm done!
*/
var multiplier = FakeObservable([1, 2, 3, , , , , 4, 5, 6]).subscribe();
multiplier.map(function (item) {
return item * item;
}).take(4);
multiplier.forEach(function (item) {
console.log(item);
// another way unsubscribe
/*
if (item > 20) {
multiplier.dispose();
}
*/
}, function (error) {
console.log('Error', error);
}, function () {
console.log('I\'m done!');
});
/*
** this will run till completion **
1
4
9
.
.
.
.
16
25
36
*/
// Add three empty items to a collection to make the event continue forever (e.g. like a mouse drag event)
//var perpetualEvent = FakeObservable([1, 2, 3, , , , , 4, 5, 6, , ,]).subscribe();
//
//perpetualEvent.forEach(function (item) {
// console.log(item);
//});
/*
** this will run forever **
1
2
3
.
.
.
.
4
5
6
.
.
.
.
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment