Skip to content

Instantly share code, notes, and snippets.

@Killavus
Created May 13, 2015 14:59
Show Gist options
  • Save Killavus/8567d282f62b0cdb6fd6 to your computer and use it in GitHub Desktop.
Save Killavus/8567d282f62b0cdb6fd6 to your computer and use it in GitHub Desktop.
class EventStream
constructor: (@options = {}, @stream = undefined) ->
unless @stream?
streamType = EventStream.subjectType(@options.repeating)
@stream = new streamType()
publish: (eventName, args...) =>
@stream.onNext.call(@stream, [eventName].concat(args))
on: (eventName, callback) =>
@stream.filter((ev) =>
ev[0] is eventName).
subscribe (args) =>
callback(args.slice(1)...)
map: (fun) =>
subjectType = EventStream.subjectType(@options.repeating)
metaSubject = new subjectType()
new EventStream({}, subjectType.create(metaSubject, Rx.Observable.merge(@stream, metaSubject).map(fun)))
@merge: (options, streams...) =>
subjectType = EventStream.subjectType(options.repeating)
metaSubject = new subjectType()
eventStreams = streams.map (eventStream) => eventStream.stream
eventStreams.push(metaSubject)
new EventStream(options, subjectType.create(metaSubject, Rx.Observable.merge(eventStreams...)))
@subjectType: (repeating) =>
return Rx.ReplaySubject if repeating
Rx.Subject
module.exports = EventStream
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment