This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@warn_unused_result(message="http://git.io/rxs.ud") | |
public func subscribeNext(onNext: (E) -> Void) -> Disposable { | |
let observer = AnonymousObserver<E> { e in | |
if case .Next(let value) = e { | |
onNext(value) | |
} | |
} | |
return self.subscribeSafe(observer) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ e in | |
if case .Next(let value) = e { | |
onNext(value) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public enum Event<Element> { | |
/// Next element is produced. | |
case Next(Element) | |
/// Sequence terminated with an error. | |
case Error(ErrorType) | |
/// Sequence completed successfully. | |
case Completed | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class AnonymousObserver<ElementType> : ObserverBase<ElementType> { | |
typealias Element = ElementType | |
typealias EventHandler = Event<Element> -> Void | |
private let _eventHandler : EventHandler | |
init(_ eventHandler: EventHandler) { | |
#if TRACE_RESOURCES | |
AtomicIncrement(&resourceCount) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@warn_unused_result(message="http://git.io/rxs.ud") | |
func subscribeSafe<O: ObserverType where O.E == E>(observer: O) -> Disposable { | |
return self.asObservable().subscribe(observer) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable { | |
_lock.lock(); defer { _lock.unlock() } | |
return _synchronized_subscribe(observer) | |
} | |
func _synchronized_subscribe<O : ObserverType where O.E == E>(observer: O) -> Disposable { | |
if _disposed { | |
observer.on(.Error(RxError.Disposed(object: self))) | |
return NopDisposable.instance | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public mutating func insert(element: T) -> BagKey { | |
_nextKey = _nextKey &+ 1 | |
#if DEBUG | |
_nextKey = _nextKey % 20 | |
#endif | |
if _nextKey == 0 { | |
_uniqueIdentity = Identity() | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func on(event: Event<E>) { | |
switch event { | |
case .Next: | |
if _isStopped == 0 { | |
onCore(event) | |
} | |
case .Error, .Completed: | |
if !AtomicCompareAndSwap(0, 1, &_isStopped) { | |
return |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
override func onCore(event: Event<Element>) { | |
return _eventHandler(event) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
struct SubscriptionDisposable<T: SynchronizedUnsubscribeType> : Disposable { | |
private let _key: T.DisposeKey | |
private weak var _owner: T? | |
init(owner: T, key: T.DisposeKey) { | |
_owner = owner | |
_key = key | |
} | |
func dispose() { |