Last active
March 28, 2018 08:04
-
-
Save monyschuk/07be2c45c2a7157522f7 to your computer and use it in GitHub Desktop.
Single file simple reactive programming classes for Swift
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//: Playground - noun: a place where people can play | |
import Cocoa | |
import XCPlayground | |
// Let asynchronous code run | |
XCPSetExecutionShouldContinueIndefinitely() | |
public func log<T>(x: T) { | |
print("\(x)") | |
} | |
// MARK: - | |
// MARK: Utilities | |
public final class Atomic<T> { | |
private var v: T | |
private var spinlock = OS_SPINLOCK_INIT | |
private func lock() { withUnsafeMutablePointer(&spinlock, OSSpinLockLock) } | |
private func unlock() { withUnsafeMutablePointer(&spinlock, OSSpinLockUnlock) } | |
public var value: T { | |
get { | |
let value: T | |
lock() | |
value = v | |
unlock() | |
return value | |
} | |
set(newValue) { | |
lock() | |
v = newValue | |
unlock() | |
} | |
} | |
public func swap(new: T) -> T { | |
let old: T | |
lock() | |
old = v | |
v = new | |
unlock() | |
return old | |
} | |
init(_ value: T) { | |
self.v = value | |
} | |
} | |
public struct TaggedArray<T>: CollectionType, SequenceType, ArrayLiteralConvertible, CustomStringConvertible, CustomDebugStringConvertible { | |
public typealias Tag = UInt64 | |
private var tag = Tag(0) | |
private var values = [(item: T, tag: Tag)]() | |
public mutating func append(item: T) -> Tag { | |
values.append((item: item, tag: ++tag)) | |
return tag | |
} | |
public mutating func insert(item: T, atIndex: Int) -> Tag { | |
values.insert((item: item, tag: ++tag), atIndex: atIndex) | |
return tag | |
} | |
public mutating func removeAtIndex(index: Int) -> T { | |
return values.removeAtIndex(index).item | |
} | |
public mutating func removeTag(tag: Tag) -> T? { | |
for i in (0..<values.count).reverse() { | |
if values[i].tag == tag { | |
let item = values[i].item | |
values.removeAtIndex(i) | |
return item | |
} | |
} | |
return nil | |
} | |
public mutating func removeAll(keepCapacity keepCapacity: Bool = true) { | |
values.removeAll(keepCapacity: keepCapacity) | |
} | |
// CollectionType | |
public var startIndex: Int { return values.startIndex } | |
public var endIndex: Int { return values.endIndex } | |
public subscript (index: Int) -> T { return values[index].item } | |
// SequenceType | |
public func generate() -> AnyGenerator<T> { | |
var generator = values.generate() | |
return anyGenerator({ | |
return generator.next()?.item | |
}) | |
} | |
// ArrayLiteralConvertible | |
public init(arrayLiteral elements: T...) { | |
for elt in elements { | |
append(elt) | |
} | |
} | |
// CustomStirngConvertible, CustomDebugStringConvertible | |
public var description: String { return values.description } | |
public var debugDescription: String { return values.debugDescription } | |
} | |
public struct Sink <T,E> { | |
let next: (T -> Void)? | |
let error: (E -> Void)? | |
let completed: (() -> Void)? | |
} | |
public struct Pipe <T,E> { | |
let sendNext: T -> Void | |
let sendError: E -> Void | |
let sendCompleted: () -> Void | |
} | |
// MARK: - | |
// MARK: Delayed Execution | |
public struct Scheduler { | |
private var queue: dispatch_queue_t | |
public func after(delay: Double, perform: ()->Void) { | |
dispatch_after( | |
dispatch_time( | |
DISPATCH_TIME_NOW, | |
Int64(delay * Double(NSEC_PER_SEC))), | |
queue) { | |
perform() | |
} | |
} | |
public func perform(perform: ()->Void) { | |
dispatch_async(queue) { | |
perform() | |
} | |
} | |
public static var mainQueueScheduler: Scheduler = Scheduler(queue: dispatch_get_main_queue()) | |
public static var backgroundScheduler: Scheduler = Scheduler(queue: dispatch_queue_create("Reactive.scheduler.background", DISPATCH_QUEUE_SERIAL)) | |
} | |
// MARK: - | |
// MARK: Observer Disposal | |
public struct Disposable { | |
public var dispose: ()->Void | |
} | |
// MARK: - | |
// MARK: Hot Signal Type | |
public final class Signal <T,E> { | |
private var terminated = false | |
private var sinks = TaggedArray<Sink<T, E>>() | |
private func sendNext(value: T) { | |
if terminated { return } | |
sinks | |
.flatMap() { $0.next } | |
.forEach() { $0(value) } | |
} | |
private func sendError(error: E) { | |
if terminated { return } | |
terminated = true | |
sinks | |
.flatMap() { $0.error } | |
.forEach() { $0(error) } | |
} | |
private func sendCompleted() { | |
if terminated { return } | |
terminated = true | |
sinks | |
.flatMap() { $0.completed } | |
.forEach() { $0() } | |
} | |
public func observe( | |
next next: (T -> Void)? = nil, | |
error: (E -> Void)? = nil, | |
completed: (() -> Void)? = nil) -> Disposable { | |
let tag = sinks.append(Sink(next: next, error: error, completed: completed)) | |
return Disposable { [weak self] in self?.sinks.removeTag(tag) } | |
} | |
public func next(next: (T->Void)) -> Signal<T,E> { | |
observe(next: next, error: nil, completed: nil) | |
return self | |
} | |
public func error(error: (E->Void)) -> Signal<T,E> { | |
observe(next: nil, error: error, completed: nil) | |
return self | |
} | |
public func completed(completed: (()->Void)) -> Signal<T,E> { | |
observe(next: nil, error: nil, completed: completed) | |
return self | |
} | |
public func onNext(next: (Void->Void)) -> Signal<T,E> { | |
observe( | |
next: { _ in | |
next() | |
}, | |
error: nil, | |
completed: nil) | |
return self | |
} | |
public func onError(error: (Void->Void)) -> Signal<T,E> { | |
observe( | |
next: nil, | |
error: { _ in | |
error() | |
}, | |
completed: nil) | |
return self | |
} | |
public func map<U>(transform: T -> U) -> Signal<U,E> { | |
let (result, pipe) = Signal<U,E>.create() | |
self.observe( | |
next: { value in | |
pipe.sendNext(transform(value)) | |
}, | |
error: pipe.sendError, | |
completed: pipe.sendCompleted) | |
return result | |
} | |
public func reduce<U>(initial: U, combine: (U,T) -> U) -> Signal<U,E> { | |
var prev = initial | |
let (result, pipe) = Signal<U,E>.create() | |
self.observe( | |
next: { value in | |
prev = combine(prev, value) | |
pipe.sendNext(prev) | |
}, | |
error: pipe.sendError, | |
completed: pipe.sendCompleted) | |
return result | |
} | |
public func filter(predicate: T -> Bool) -> Signal<T,E> { | |
let (result, pipe) = Signal<T,E>.create() | |
self.observe( | |
next: { value in | |
if predicate(value) { | |
pipe.sendNext(value) | |
} | |
}, | |
error: pipe.sendError, | |
completed: pipe.sendCompleted) | |
return result | |
} | |
public func combinePrevious(initial: T) -> Signal<(T,T),E> { | |
var prev = initial | |
let (result, pipe) = Signal<(T,T),E>.create() | |
self.observe( | |
next: { value in | |
pipe.sendNext((prev, value)) | |
prev = value | |
}, | |
error: pipe.sendError, | |
completed: pipe.sendCompleted) | |
return result | |
} | |
public func skipRepeats(equal: (T, T) -> Bool) -> Signal<T,E> { | |
return self | |
.map {Optional($0)} | |
.combinePrevious(nil) | |
.filter { (a, b) in | |
if let a = a, b = b where equal(a, b) { | |
return false | |
} else { | |
return true | |
} | |
} | |
.map { $0.1! } | |
} | |
public func throttle(delay: Double, onScheduler scheduler: Scheduler = Scheduler.mainQueueScheduler) -> Signal<T,E> { | |
let (result, pipe) = Signal<T,E>.create() | |
let last: Atomic<T?> = Atomic(nil) | |
self.observe( | |
next: { value in | |
let old = last.swap(value) | |
if old == nil { | |
scheduler.after(delay) { () -> Void in | |
if let value = last.swap(nil) { | |
pipe.sendNext(value) | |
} | |
} | |
} | |
}, | |
error: { error in | |
scheduler.perform { | |
if let value = last.swap(nil) { | |
pipe.sendNext(value) | |
} | |
pipe.sendError(error) | |
} | |
}, | |
completed: { | |
scheduler.perform { | |
if let value = last.swap(nil) { | |
pipe.sendNext(value) | |
} | |
pipe.sendCompleted() | |
} | |
} | |
) | |
return result | |
} | |
public static func create() -> (Signal<T,E>, Pipe<T,E>) { | |
let signal = Signal<T,E>() | |
let pipe = Pipe( | |
sendNext: signal.sendNext, | |
sendError: signal.sendError, | |
sendCompleted: signal.sendCompleted) | |
return (signal, pipe) | |
} | |
deinit { | |
sendCompleted() | |
} | |
} | |
public extension Signal where T: Equatable { | |
public func skipRepeats() -> Signal<T,E> { | |
return self.skipRepeats(==) | |
} | |
} | |
public protocol OptionalType { | |
typealias T | |
var optional: T? { get } | |
} | |
extension Optional: OptionalType { | |
public var optional: Wrapped? { | |
return self | |
} | |
} | |
public extension Signal where T: OptionalType { | |
public func ignoreNil() -> Signal<T.T,E> { | |
return self | |
.filter { $0.optional != nil } | |
.map { $0.optional! } | |
} | |
} | |
public extension Signal where T: SequenceType { | |
public func mapElements<U>(transform: T.Generator.Element -> U) -> Signal<[U],E> { | |
let (result, pipe) = Signal<[U],E>.create() | |
self.observe( | |
next: { value in | |
pipe.sendNext(value.map(transform)) | |
}, | |
error: pipe.sendError, | |
completed: pipe.sendCompleted) | |
return result | |
} | |
public func filterElements(includeElement: T.Generator.Element -> Bool) -> Signal<[T.Generator.Element],E> { | |
let (result, pipe) = Signal<[T.Generator.Element],E>.create() | |
self.observe( | |
next: { value in | |
pipe.sendNext(value.filter(includeElement)) | |
}, | |
error: pipe.sendError, | |
completed: pipe.sendCompleted) | |
return result | |
} | |
} | |
public extension Signal where T: SequenceType, T.Generator.Element: Hashable { | |
public func addedElements(added: Set<T.Generator.Element> -> Void) -> Signal<T, E> { | |
return changedElements { change in | |
if !change.added.isEmpty { | |
added(change.added) | |
} | |
} | |
} | |
public func removedElements(removed: Set<T.Generator.Element> -> Void) -> Signal<T, E> { | |
return changedElements { change in | |
if !change.removed.isEmpty { | |
removed(change.removed) | |
} | |
} | |
} | |
public func changedElements(changed: (added: Set<T.Generator.Element>, removed: Set<T.Generator.Element>) -> Void) -> Signal<T, E> { | |
mapElements { $0 } | |
.combinePrevious([]) | |
.next { old, new in | |
let addedElements = Set(new).subtract(old) | |
let removedElements = Set(old).subtract(new) | |
if !addedElements.isEmpty || !removedElements.isEmpty { | |
changed(added: addedElements, removed: removedElements) | |
} | |
} | |
return self | |
} | |
} | |
// MARK: - | |
// MARK: Cold Signal Type | |
public struct SignalProducer<T,E> { | |
public let initiate: (Pipe<T,E>, Disposable?) -> Disposable? | |
public func start(@noescape setup: Signal<T,E> -> Void) -> Disposable? { | |
let (signal, pipe) = Signal<T,E>.create(); setup(signal); return initiate(pipe, nil) | |
} | |
public func startObserving( | |
next next: (T->Void)? = nil, | |
error: (E->Void)? = nil, | |
completed: (()->Void)? = nil) -> Disposable? { | |
return start() { signal in signal.observe(next: next, error: error, completed: completed) } | |
} | |
public func startObservingNext(next: T->Void) -> Disposable? { | |
return startObserving(next: next) | |
} | |
public func startObservingError(error: E->Void) -> Disposable? { | |
return startObserving(error: error) | |
} | |
public func startObservingCompleted(completed: ()->Void) -> Disposable? { | |
return startObserving(completed: completed) | |
} | |
// creates a new producer which applies the given transform to every signal it initiates | |
public func lift<U,F>(transform: Signal<T,E> -> Signal<U,F>) -> SignalProducer<U,F> { | |
return SignalProducer<U,F> { pipe, disposable in | |
self.start { signal in | |
transform(signal) | |
.observe( | |
next: pipe.sendNext, | |
error: pipe.sendError, | |
completed: pipe.sendCompleted) | |
} | |
return disposable | |
} | |
} | |
public func map<U>(transform: T -> U) -> SignalProducer<U,E> { | |
return lift { signal in signal.map(transform) } | |
} | |
public func reduce<U>(initial: U, combine: (U,T) -> U) -> SignalProducer<U,E> { | |
return lift { signal in signal.reduce(initial, combine: combine) } | |
} | |
public func filter(predicate: T -> Bool) -> SignalProducer<T,E> { | |
return lift { signal in signal.filter(predicate) } | |
} | |
public func combinePrevious(initial: T) -> SignalProducer<(T,T),E> { | |
return lift { signal in signal.combinePrevious(initial) } | |
} | |
public func skipRepeats(equal: (T, T) -> Bool) -> SignalProducer<T,E> { | |
return lift { signal in signal.skipRepeats(equal) } | |
} | |
public func throttle(duration: Double, onScheduler scheduler: Scheduler = Scheduler.mainQueueScheduler) -> SignalProducer<T,E> { | |
return lift { signal in signal.throttle(duration, onScheduler: scheduler) } | |
} | |
public static func next(value: T) -> SignalProducer<T, E> { | |
return SignalProducer<T,E> { pipe, disposable in | |
pipe.sendNext(value) | |
pipe.sendCompleted() | |
return disposable | |
} | |
} | |
public static func error(error: E) -> SignalProducer<T,E> { | |
return SignalProducer<T,E> { pipe, disposable in | |
pipe.sendError(error) | |
return disposable | |
} | |
} | |
public static func array(array: [T]) -> SignalProducer<T,E> { | |
return SignalProducer<T,E> { pipe, disposable in | |
for item in array { pipe.sendNext(item) } | |
pipe.sendCompleted() | |
return disposable | |
} | |
} | |
} | |
public enum RetryInterval { | |
case Constant(Double) // base | |
case Exponential(Double, Double) // base, max | |
} | |
public extension SignalProducer { | |
public func retry(times: Int, interval: RetryInterval, onScheduler scheduler: Scheduler = Scheduler.mainQueueScheduler) -> SignalProducer<T,E> { | |
return SignalProducer { pipe, disposable in | |
let cancelled = Atomic(false) | |
func attemptStart(attempt: Int, times: Int) { | |
if cancelled.value { return } | |
// NOTE: the following may be received some time | |
// after attemptStart() is called, so in each we test | |
// for cancelled state again before proceeding with actions | |
self.start { signal in | |
signal.observe( | |
next: { value in | |
if cancelled.value { return } | |
scheduler.perform { | |
pipe.sendNext(value) | |
} | |
}, | |
error: { error in | |
if cancelled.value { return } | |
if attempt >= times { | |
scheduler.perform { | |
pipe.sendError(error) | |
} | |
} else { | |
switch interval { | |
case let .Constant(delay): | |
scheduler.after(delay) { | |
attemptStart(attempt + 1, times: times) | |
} | |
break; | |
case let .Exponential(base, maximum): | |
let delay = min(pow(base, Double(attempt + 1)), maximum) | |
scheduler.after(delay) { | |
attemptStart(attempt + 1, times: times) | |
} | |
} | |
} | |
}, | |
completed: { | |
if cancelled.value { return } | |
scheduler.perform { | |
pipe.sendCompleted() | |
} | |
}) | |
} | |
} | |
attemptStart(0, times: times) | |
return Disposable { | |
cancelled.value = true | |
disposable?.dispose() | |
} | |
} | |
} | |
} | |
public extension SignalProducer where T: Equatable { | |
public func skipRepeats() -> SignalProducer<T,E> { | |
return lift { signal in signal.skipRepeats(==) } | |
} | |
} | |
public extension SignalProducer where T: OptionalType { | |
public func ignoreNil() -> SignalProducer<T.T,E> { | |
return lift { signal in signal.ignoreNil() } | |
} | |
} | |
public extension SignalProducer where T: SequenceType { | |
public func mapElements<U>(transform: T.Generator.Element -> U) -> SignalProducer<[U],E> { | |
return lift { signal in signal.mapElements(transform) } | |
} | |
public func filterElements(includeElement: T.Generator.Element -> Bool) -> SignalProducer<[T.Generator.Element],E> { | |
return lift { signal in signal.filterElements(includeElement) } | |
} | |
} | |
// the following three may be borked since the underlying signal calls implement | |
// side effects on the called signal, rather than transformations of the called signal | |
public extension SignalProducer where T: SequenceType, T.Generator.Element: Hashable { | |
public func addedElements(added: Set<T.Generator.Element> -> Void) -> SignalProducer<T, E> { | |
return lift { signal in signal.addedElements(added) } | |
} | |
public func removedElements(removed: Set<T.Generator.Element> -> Void) -> SignalProducer<T, E> { | |
return lift { signal in signal.removedElements(removed) } | |
} | |
public func changedElements(changed: (added: Set<T.Generator.Element>, removed: Set<T.Generator.Element>) -> Void) -> SignalProducer<T, E> { | |
return lift { signal in signal.changedElements(changed) } | |
} | |
} | |
public struct SignalSource<T,E> { | |
public var pipe: Pipe<T,E> | |
public var signal: Signal<T,E> | |
public init() { | |
let (signal, pipe) = Signal<T,E>.create() | |
self.pipe = pipe; | |
self.signal = signal | |
} | |
public func sendNext(value: T) { | |
pipe.sendNext(value) | |
} | |
public func sendError(error: E) { | |
pipe.sendError(error) | |
} | |
public func sendCompleted() { | |
pipe.sendCompleted() | |
} | |
} | |
public final class Observable<T> { | |
public var value: T { | |
didSet { | |
for pipe in pipes { pipe.sendNext(value) } | |
} | |
} | |
public var signal: SignalProducer<T,Void> { | |
return SignalProducer { [weak self] pipe, disposable in | |
if let next = self?.value { pipe.sendNext(next) } | |
if let tag = self?.pipes.append(pipe) { return Disposable { self?.pipes.removeTag(tag); disposable?.dispose() } } | |
return disposable | |
} | |
} | |
private var pipes = TaggedArray<Pipe<T,Void>>() | |
public init(_ value: T) { | |
self.value = value | |
} | |
deinit { | |
for pipe in pipes { pipe.sendCompleted() } | |
} | |
} | |
prefix operator ^ {} | |
public prefix func ^<T>(rhs: Observable<T>) -> T { | |
return rhs.value | |
} | |
public func ^=<T>(lhs: Observable<T>, rhs: T) { | |
lhs.value = rhs | |
} | |
// MARK: - | |
// MARK: Foundation Extensions | |
public extension NSNotificationCenter { | |
public func signal(name name: String? = nil, object: AnyObject? = nil, queue: NSOperationQueue? = nil) -> SignalProducer<NSNotification, Void> { | |
return SignalProducer { pipe, disposable in | |
let observer = self.addObserverForName(name, object: object, queue: queue) { notification in | |
pipe.sendNext(notification) | |
} | |
return Disposable { | |
self.removeObserver(observer) | |
disposable?.dispose() | |
} | |
} | |
} | |
} | |
private let defaultSessionError = NSError(domain: "Reactive.NSURLSession.signal", code: 1, userInfo: nil) | |
public extension NSURLSession { | |
public func signal(URL: NSURL) -> SignalProducer<(data: NSData, response: NSURLResponse), NSError> { | |
return signal(request: NSURLRequest(URL: URL)) | |
} | |
public func signal(request request: NSURLRequest) -> SignalProducer<(data: NSData, response: NSURLResponse), NSError> { | |
return SignalProducer { pipe, disposable in | |
let task = self.dataTaskWithRequest(request) { data, response, error in | |
if let data = data, response = response { | |
pipe.sendNext((data: data, response: response)) | |
pipe.sendCompleted() | |
} else { | |
pipe.sendError(error ?? defaultSessionError) | |
} | |
} | |
task.resume() | |
return Disposable { | |
task.cancel() | |
disposable?.dispose() | |
} | |
} | |
} | |
} | |
// MARK: - | |
// MARK: Sample Usage | |
let request = NSURLRequest(URL: NSURL(string: "http://www.apple.com")!) | |
NSURLSession.sharedSession().signal(request: request) | |
.retry(5, interval: .Constant(15)) // retry fetch 5 times every 15 sec | |
.filter { result in result.data.length > 5000 } // skip if data is less than 5k | |
.startObserving( | |
next: { result in log("got \(result.data.length) bytes") }, | |
error: { error in log("got \(error)") }, | |
completed: { log("got completed") } | |
)//?.dispose() // dispose will cancel the lot if desired | |
SignalProducer<String,Void>.array(["foo", "bar", "baz", "buq"]) | |
.throttle(0.1) // throttle results every 0.1sec. | |
.start() { | |
$0 .next { log("next \($0)") } | |
.completed { log("completed") } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment