Skip to content

Instantly share code, notes, and snippets.

@romanmiller
Last active November 28, 2017 16:46
Show Gist options
  • Save romanmiller/e126ceb5ff0588c39966ba69c9ece3c2 to your computer and use it in GitHub Desktop.
Save romanmiller/e126ceb5ff0588c39966ba69c9ece3c2 to your computer and use it in GitHub Desktop.
ReactiveKit + BrightFutures (Signal+Extensions)
extension SignalProtocol {
func with<T:AnyObject>(weak: T)->Signal<(Element, T), Error> {
weak var w = weak
return Signal { observer in
return self.observe { event in
switch event {
case .next(let element):
if let w = w {
observer.next((element, w))
}
case .completed:
observer.completed()
default:
break
}
}
}
}
}
extension SignalProtocol where Element == Void {
func with<T:AnyObject>(weak: T)->Signal<(T), Error> {
weak var w = weak
return Signal { observer in
return self.observe { event in
switch event {
case .next(_):
if let w = w {
observer.next(w)
}
case .completed:
observer.completed()
default:
break
}
}
}
}
}
extension SignalProtocol {
func flatMap<T>(_ transform: @escaping (Element) -> BrightFutures.Future<T, Error>) -> Signal<T, Error> {
return Signal<T, Error> { observer in
return self.observe { event in
switch event {
case .failed(let error):
observer.failed(error)
case .completed:
observer.completed()
case .next(let element):
transform(element)
.onSuccess{ v in
observer.next(v)
}
.onFailure {
observer.failed($0)
}
}
}
}
}
}
extension SignalProtocol {
func zip<T, E>(O: BrightFutures.Future<T, E>) -> Signal<(Element, T), E> where E == Error {
return Signal{ observer in
var left: Element? = nil
var error: E? = nil
O.onComplete{ res in
switch res {
case .success(let val):
if let element = left {
observer.next( (element, val) )
} else if let error = error {
observer.failed(error)
}
case .failure(let error): observer.failed(error)
}
}
let lock = NSRecursiveLock(name: "zip")
return self.observe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case .next(let element):
if let res = O.result {
switch res {
case .success(let val): observer.next( (element, val) )
case .failure(let error): observer.failed(error)
}
}
else {
left = element
}
case .failed(let err):
error = err
observer.failed(err)
case .completed:
observer.completed()
}
}
}
}
}
extension SignalProtocol where Error==ReactiveKit.NoError {
func zip<T, E>(O: BrightFutures.Future<T, E>) -> Signal<(Element, T), E> {
return Signal{ observer in
var left: Element? = nil
O.onComplete{ res in
switch res {
case .success(let val):
if let element = left {
observer.next( (element, val) )
}
case .failure(let error): observer.failed(error)
}
}
let lock = NSRecursiveLock(name: "zip")
return self.observe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case .next(let element):
if let res = O.result {
switch res {
case .success(let val): observer.next( (element, val) )
case .failure(let error): observer.failed(error)
}
}
else {
left = element
}
case .failed(_): break;
case .completed:
observer.completed()
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment