Last active
December 16, 2015 21:20
-
-
Save mattpodwysocki/5499037 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Rx.Observable.asStream = function (ev) { | |
return Rx.Observable.create(function (obs) { | |
function handler (data) { | |
obs.onNext(data); // Call as many times with onNext to yield new data | |
// obs.onCompleted(); // optional to signal the end of the stream | |
} | |
ev.on('data', handler); | |
// Can handle errors too | |
function errHandler (e) { | |
obs.onError(e); | |
} | |
ev.on('error', errHandler); | |
// When disposing, clean up | |
return function () { | |
ev.off(name, handler); | |
ev.off('error', errHandler); | |
}; | |
}; | |
}; | |
// Filtering | |
var rxStream = Rx.Observable.asStream(stream) | |
.filter(function (x) { /* whatever predicate you wish */ }); | |
var handle = rxStream.subscribe(function (x) { | |
// Handle values | |
}, function (err) { | |
// Handle errors | |
}); | |
// Clean up the handles which removes the event listeners | |
handle.dispose(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment