Last active
November 28, 2017 16:46
-
-
Save romanmiller/e126ceb5ff0588c39966ba69c9ece3c2 to your computer and use it in GitHub Desktop.
ReactiveKit + BrightFutures (Signal+Extensions)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extension SignalProtocol { | |
func with<T:AnyObject>(weak: T)->Signal<(Element, T), Error> { | |
weak var w = weak | |
return Signal { observer in | |
return self.observe { event in | |
switch event { | |
case .next(let element): | |
if let w = w { | |
observer.next((element, w)) | |
} | |
case .completed: | |
observer.completed() | |
default: | |
break | |
} | |
} | |
} | |
} | |
} | |
extension SignalProtocol where Element == Void { | |
func with<T:AnyObject>(weak: T)->Signal<(T), Error> { | |
weak var w = weak | |
return Signal { observer in | |
return self.observe { event in | |
switch event { | |
case .next(_): | |
if let w = w { | |
observer.next(w) | |
} | |
case .completed: | |
observer.completed() | |
default: | |
break | |
} | |
} | |
} | |
} | |
} | |
extension SignalProtocol { | |
func flatMap<T>(_ transform: @escaping (Element) -> BrightFutures.Future<T, Error>) -> Signal<T, Error> { | |
return Signal<T, Error> { observer in | |
return self.observe { event in | |
switch event { | |
case .failed(let error): | |
observer.failed(error) | |
case .completed: | |
observer.completed() | |
case .next(let element): | |
transform(element) | |
.onSuccess{ v in | |
observer.next(v) | |
} | |
.onFailure { | |
observer.failed($0) | |
} | |
} | |
} | |
} | |
} | |
} | |
extension SignalProtocol { | |
func zip<T, E>(O: BrightFutures.Future<T, E>) -> Signal<(Element, T), E> where E == Error { | |
return Signal{ observer in | |
var left: Element? = nil | |
var error: E? = nil | |
O.onComplete{ res in | |
switch res { | |
case .success(let val): | |
if let element = left { | |
observer.next( (element, val) ) | |
} else if let error = error { | |
observer.failed(error) | |
} | |
case .failure(let error): observer.failed(error) | |
} | |
} | |
let lock = NSRecursiveLock(name: "zip") | |
return self.observe { event in | |
lock.lock(); defer { lock.unlock() } | |
switch event { | |
case .next(let element): | |
if let res = O.result { | |
switch res { | |
case .success(let val): observer.next( (element, val) ) | |
case .failure(let error): observer.failed(error) | |
} | |
} | |
else { | |
left = element | |
} | |
case .failed(let err): | |
error = err | |
observer.failed(err) | |
case .completed: | |
observer.completed() | |
} | |
} | |
} | |
} | |
} | |
extension SignalProtocol where Error==ReactiveKit.NoError { | |
func zip<T, E>(O: BrightFutures.Future<T, E>) -> Signal<(Element, T), E> { | |
return Signal{ observer in | |
var left: Element? = nil | |
O.onComplete{ res in | |
switch res { | |
case .success(let val): | |
if let element = left { | |
observer.next( (element, val) ) | |
} | |
case .failure(let error): observer.failed(error) | |
} | |
} | |
let lock = NSRecursiveLock(name: "zip") | |
return self.observe { event in | |
lock.lock(); defer { lock.unlock() } | |
switch event { | |
case .next(let element): | |
if let res = O.result { | |
switch res { | |
case .success(let val): observer.next( (element, val) ) | |
case .failure(let error): observer.failed(error) | |
} | |
} | |
else { | |
left = element | |
} | |
case .failed(_): break; | |
case .completed: | |
observer.completed() | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment