Skip to content

Instantly share code, notes, and snippets.

@monyschuk
Last active March 28, 2018 08:04
Show Gist options
  • Save monyschuk/07be2c45c2a7157522f7 to your computer and use it in GitHub Desktop.
Save monyschuk/07be2c45c2a7157522f7 to your computer and use it in GitHub Desktop.
Single file simple reactive programming classes for Swift
//: Playground - noun: a place where people can play
import Cocoa
import XCPlayground
// Let asynchronous code run
XCPSetExecutionShouldContinueIndefinitely()
public func log<T>(x: T) {
print("\(x)")
}
// MARK: -
// MARK: Utilities
public final class Atomic<T> {
private var v: T
private var spinlock = OS_SPINLOCK_INIT
private func lock() { withUnsafeMutablePointer(&spinlock, OSSpinLockLock) }
private func unlock() { withUnsafeMutablePointer(&spinlock, OSSpinLockUnlock) }
public var value: T {
get {
let value: T
lock()
value = v
unlock()
return value
}
set(newValue) {
lock()
v = newValue
unlock()
}
}
public func swap(new: T) -> T {
let old: T
lock()
old = v
v = new
unlock()
return old
}
init(_ value: T) {
self.v = value
}
}
public struct TaggedArray<T>: CollectionType, SequenceType, ArrayLiteralConvertible, CustomStringConvertible, CustomDebugStringConvertible {
public typealias Tag = UInt64
private var tag = Tag(0)
private var values = [(item: T, tag: Tag)]()
public mutating func append(item: T) -> Tag {
values.append((item: item, tag: ++tag))
return tag
}
public mutating func insert(item: T, atIndex: Int) -> Tag {
values.insert((item: item, tag: ++tag), atIndex: atIndex)
return tag
}
public mutating func removeAtIndex(index: Int) -> T {
return values.removeAtIndex(index).item
}
public mutating func removeTag(tag: Tag) -> T? {
for i in (0..<values.count).reverse() {
if values[i].tag == tag {
let item = values[i].item
values.removeAtIndex(i)
return item
}
}
return nil
}
public mutating func removeAll(keepCapacity keepCapacity: Bool = true) {
values.removeAll(keepCapacity: keepCapacity)
}
// CollectionType
public var startIndex: Int { return values.startIndex }
public var endIndex: Int { return values.endIndex }
public subscript (index: Int) -> T { return values[index].item }
// SequenceType
public func generate() -> AnyGenerator<T> {
var generator = values.generate()
return anyGenerator({
return generator.next()?.item
})
}
// ArrayLiteralConvertible
public init(arrayLiteral elements: T...) {
for elt in elements {
append(elt)
}
}
// CustomStirngConvertible, CustomDebugStringConvertible
public var description: String { return values.description }
public var debugDescription: String { return values.debugDescription }
}
public struct Sink <T,E> {
let next: (T -> Void)?
let error: (E -> Void)?
let completed: (() -> Void)?
}
public struct Pipe <T,E> {
let sendNext: T -> Void
let sendError: E -> Void
let sendCompleted: () -> Void
}
// MARK: -
// MARK: Delayed Execution
public struct Scheduler {
private var queue: dispatch_queue_t
public func after(delay: Double, perform: ()->Void) {
dispatch_after(
dispatch_time(
DISPATCH_TIME_NOW,
Int64(delay * Double(NSEC_PER_SEC))),
queue) {
perform()
}
}
public func perform(perform: ()->Void) {
dispatch_async(queue) {
perform()
}
}
public static var mainQueueScheduler: Scheduler = Scheduler(queue: dispatch_get_main_queue())
public static var backgroundScheduler: Scheduler = Scheduler(queue: dispatch_queue_create("Reactive.scheduler.background", DISPATCH_QUEUE_SERIAL))
}
// MARK: -
// MARK: Observer Disposal
public struct Disposable {
public var dispose: ()->Void
}
// MARK: -
// MARK: Hot Signal Type
public final class Signal <T,E> {
private var terminated = false
private var sinks = TaggedArray<Sink<T, E>>()
private func sendNext(value: T) {
if terminated { return }
sinks
.flatMap() { $0.next }
.forEach() { $0(value) }
}
private func sendError(error: E) {
if terminated { return }
terminated = true
sinks
.flatMap() { $0.error }
.forEach() { $0(error) }
}
private func sendCompleted() {
if terminated { return }
terminated = true
sinks
.flatMap() { $0.completed }
.forEach() { $0() }
}
public func observe(
next next: (T -> Void)? = nil,
error: (E -> Void)? = nil,
completed: (() -> Void)? = nil) -> Disposable {
let tag = sinks.append(Sink(next: next, error: error, completed: completed))
return Disposable { [weak self] in self?.sinks.removeTag(tag) }
}
public func next(next: (T->Void)) -> Signal<T,E> {
observe(next: next, error: nil, completed: nil)
return self
}
public func error(error: (E->Void)) -> Signal<T,E> {
observe(next: nil, error: error, completed: nil)
return self
}
public func completed(completed: (()->Void)) -> Signal<T,E> {
observe(next: nil, error: nil, completed: completed)
return self
}
public func onNext(next: (Void->Void)) -> Signal<T,E> {
observe(
next: { _ in
next()
},
error: nil,
completed: nil)
return self
}
public func onError(error: (Void->Void)) -> Signal<T,E> {
observe(
next: nil,
error: { _ in
error()
},
completed: nil)
return self
}
public func map<U>(transform: T -> U) -> Signal<U,E> {
let (result, pipe) = Signal<U,E>.create()
self.observe(
next: { value in
pipe.sendNext(transform(value))
},
error: pipe.sendError,
completed: pipe.sendCompleted)
return result
}
public func reduce<U>(initial: U, combine: (U,T) -> U) -> Signal<U,E> {
var prev = initial
let (result, pipe) = Signal<U,E>.create()
self.observe(
next: { value in
prev = combine(prev, value)
pipe.sendNext(prev)
},
error: pipe.sendError,
completed: pipe.sendCompleted)
return result
}
public func filter(predicate: T -> Bool) -> Signal<T,E> {
let (result, pipe) = Signal<T,E>.create()
self.observe(
next: { value in
if predicate(value) {
pipe.sendNext(value)
}
},
error: pipe.sendError,
completed: pipe.sendCompleted)
return result
}
public func combinePrevious(initial: T) -> Signal<(T,T),E> {
var prev = initial
let (result, pipe) = Signal<(T,T),E>.create()
self.observe(
next: { value in
pipe.sendNext((prev, value))
prev = value
},
error: pipe.sendError,
completed: pipe.sendCompleted)
return result
}
public func skipRepeats(equal: (T, T) -> Bool) -> Signal<T,E> {
return self
.map {Optional($0)}
.combinePrevious(nil)
.filter { (a, b) in
if let a = a, b = b where equal(a, b) {
return false
} else {
return true
}
}
.map { $0.1! }
}
public func throttle(delay: Double, onScheduler scheduler: Scheduler = Scheduler.mainQueueScheduler) -> Signal<T,E> {
let (result, pipe) = Signal<T,E>.create()
let last: Atomic<T?> = Atomic(nil)
self.observe(
next: { value in
let old = last.swap(value)
if old == nil {
scheduler.after(delay) { () -> Void in
if let value = last.swap(nil) {
pipe.sendNext(value)
}
}
}
},
error: { error in
scheduler.perform {
if let value = last.swap(nil) {
pipe.sendNext(value)
}
pipe.sendError(error)
}
},
completed: {
scheduler.perform {
if let value = last.swap(nil) {
pipe.sendNext(value)
}
pipe.sendCompleted()
}
}
)
return result
}
public static func create() -> (Signal<T,E>, Pipe<T,E>) {
let signal = Signal<T,E>()
let pipe = Pipe(
sendNext: signal.sendNext,
sendError: signal.sendError,
sendCompleted: signal.sendCompleted)
return (signal, pipe)
}
deinit {
sendCompleted()
}
}
public extension Signal where T: Equatable {
public func skipRepeats() -> Signal<T,E> {
return self.skipRepeats(==)
}
}
public protocol OptionalType {
typealias T
var optional: T? { get }
}
extension Optional: OptionalType {
public var optional: Wrapped? {
return self
}
}
public extension Signal where T: OptionalType {
public func ignoreNil() -> Signal<T.T,E> {
return self
.filter { $0.optional != nil }
.map { $0.optional! }
}
}
public extension Signal where T: SequenceType {
public func mapElements<U>(transform: T.Generator.Element -> U) -> Signal<[U],E> {
let (result, pipe) = Signal<[U],E>.create()
self.observe(
next: { value in
pipe.sendNext(value.map(transform))
},
error: pipe.sendError,
completed: pipe.sendCompleted)
return result
}
public func filterElements(includeElement: T.Generator.Element -> Bool) -> Signal<[T.Generator.Element],E> {
let (result, pipe) = Signal<[T.Generator.Element],E>.create()
self.observe(
next: { value in
pipe.sendNext(value.filter(includeElement))
},
error: pipe.sendError,
completed: pipe.sendCompleted)
return result
}
}
public extension Signal where T: SequenceType, T.Generator.Element: Hashable {
public func addedElements(added: Set<T.Generator.Element> -> Void) -> Signal<T, E> {
return changedElements { change in
if !change.added.isEmpty {
added(change.added)
}
}
}
public func removedElements(removed: Set<T.Generator.Element> -> Void) -> Signal<T, E> {
return changedElements { change in
if !change.removed.isEmpty {
removed(change.removed)
}
}
}
public func changedElements(changed: (added: Set<T.Generator.Element>, removed: Set<T.Generator.Element>) -> Void) -> Signal<T, E> {
mapElements { $0 }
.combinePrevious([])
.next { old, new in
let addedElements = Set(new).subtract(old)
let removedElements = Set(old).subtract(new)
if !addedElements.isEmpty || !removedElements.isEmpty {
changed(added: addedElements, removed: removedElements)
}
}
return self
}
}
// MARK: -
// MARK: Cold Signal Type
public struct SignalProducer<T,E> {
public let initiate: (Pipe<T,E>, Disposable?) -> Disposable?
public func start(@noescape setup: Signal<T,E> -> Void) -> Disposable? {
let (signal, pipe) = Signal<T,E>.create(); setup(signal); return initiate(pipe, nil)
}
public func startObserving(
next next: (T->Void)? = nil,
error: (E->Void)? = nil,
completed: (()->Void)? = nil) -> Disposable? {
return start() { signal in signal.observe(next: next, error: error, completed: completed) }
}
public func startObservingNext(next: T->Void) -> Disposable? {
return startObserving(next: next)
}
public func startObservingError(error: E->Void) -> Disposable? {
return startObserving(error: error)
}
public func startObservingCompleted(completed: ()->Void) -> Disposable? {
return startObserving(completed: completed)
}
// creates a new producer which applies the given transform to every signal it initiates
public func lift<U,F>(transform: Signal<T,E> -> Signal<U,F>) -> SignalProducer<U,F> {
return SignalProducer<U,F> { pipe, disposable in
self.start { signal in
transform(signal)
.observe(
next: pipe.sendNext,
error: pipe.sendError,
completed: pipe.sendCompleted)
}
return disposable
}
}
public func map<U>(transform: T -> U) -> SignalProducer<U,E> {
return lift { signal in signal.map(transform) }
}
public func reduce<U>(initial: U, combine: (U,T) -> U) -> SignalProducer<U,E> {
return lift { signal in signal.reduce(initial, combine: combine) }
}
public func filter(predicate: T -> Bool) -> SignalProducer<T,E> {
return lift { signal in signal.filter(predicate) }
}
public func combinePrevious(initial: T) -> SignalProducer<(T,T),E> {
return lift { signal in signal.combinePrevious(initial) }
}
public func skipRepeats(equal: (T, T) -> Bool) -> SignalProducer<T,E> {
return lift { signal in signal.skipRepeats(equal) }
}
public func throttle(duration: Double, onScheduler scheduler: Scheduler = Scheduler.mainQueueScheduler) -> SignalProducer<T,E> {
return lift { signal in signal.throttle(duration, onScheduler: scheduler) }
}
public static func next(value: T) -> SignalProducer<T, E> {
return SignalProducer<T,E> { pipe, disposable in
pipe.sendNext(value)
pipe.sendCompleted()
return disposable
}
}
public static func error(error: E) -> SignalProducer<T,E> {
return SignalProducer<T,E> { pipe, disposable in
pipe.sendError(error)
return disposable
}
}
public static func array(array: [T]) -> SignalProducer<T,E> {
return SignalProducer<T,E> { pipe, disposable in
for item in array { pipe.sendNext(item) }
pipe.sendCompleted()
return disposable
}
}
}
public enum RetryInterval {
case Constant(Double) // base
case Exponential(Double, Double) // base, max
}
public extension SignalProducer {
public func retry(times: Int, interval: RetryInterval, onScheduler scheduler: Scheduler = Scheduler.mainQueueScheduler) -> SignalProducer<T,E> {
return SignalProducer { pipe, disposable in
let cancelled = Atomic(false)
func attemptStart(attempt: Int, times: Int) {
if cancelled.value { return }
// NOTE: the following may be received some time
// after attemptStart() is called, so in each we test
// for cancelled state again before proceeding with actions
self.start { signal in
signal.observe(
next: { value in
if cancelled.value { return }
scheduler.perform {
pipe.sendNext(value)
}
},
error: { error in
if cancelled.value { return }
if attempt >= times {
scheduler.perform {
pipe.sendError(error)
}
} else {
switch interval {
case let .Constant(delay):
scheduler.after(delay) {
attemptStart(attempt + 1, times: times)
}
break;
case let .Exponential(base, maximum):
let delay = min(pow(base, Double(attempt + 1)), maximum)
scheduler.after(delay) {
attemptStart(attempt + 1, times: times)
}
}
}
},
completed: {
if cancelled.value { return }
scheduler.perform {
pipe.sendCompleted()
}
})
}
}
attemptStart(0, times: times)
return Disposable {
cancelled.value = true
disposable?.dispose()
}
}
}
}
public extension SignalProducer where T: Equatable {
public func skipRepeats() -> SignalProducer<T,E> {
return lift { signal in signal.skipRepeats(==) }
}
}
public extension SignalProducer where T: OptionalType {
public func ignoreNil() -> SignalProducer<T.T,E> {
return lift { signal in signal.ignoreNil() }
}
}
public extension SignalProducer where T: SequenceType {
public func mapElements<U>(transform: T.Generator.Element -> U) -> SignalProducer<[U],E> {
return lift { signal in signal.mapElements(transform) }
}
public func filterElements(includeElement: T.Generator.Element -> Bool) -> SignalProducer<[T.Generator.Element],E> {
return lift { signal in signal.filterElements(includeElement) }
}
}
// the following three may be borked since the underlying signal calls implement
// side effects on the called signal, rather than transformations of the called signal
public extension SignalProducer where T: SequenceType, T.Generator.Element: Hashable {
public func addedElements(added: Set<T.Generator.Element> -> Void) -> SignalProducer<T, E> {
return lift { signal in signal.addedElements(added) }
}
public func removedElements(removed: Set<T.Generator.Element> -> Void) -> SignalProducer<T, E> {
return lift { signal in signal.removedElements(removed) }
}
public func changedElements(changed: (added: Set<T.Generator.Element>, removed: Set<T.Generator.Element>) -> Void) -> SignalProducer<T, E> {
return lift { signal in signal.changedElements(changed) }
}
}
public struct SignalSource<T,E> {
public var pipe: Pipe<T,E>
public var signal: Signal<T,E>
public init() {
let (signal, pipe) = Signal<T,E>.create()
self.pipe = pipe;
self.signal = signal
}
public func sendNext(value: T) {
pipe.sendNext(value)
}
public func sendError(error: E) {
pipe.sendError(error)
}
public func sendCompleted() {
pipe.sendCompleted()
}
}
public final class Observable<T> {
public var value: T {
didSet {
for pipe in pipes { pipe.sendNext(value) }
}
}
public var signal: SignalProducer<T,Void> {
return SignalProducer { [weak self] pipe, disposable in
if let next = self?.value { pipe.sendNext(next) }
if let tag = self?.pipes.append(pipe) { return Disposable { self?.pipes.removeTag(tag); disposable?.dispose() } }
return disposable
}
}
private var pipes = TaggedArray<Pipe<T,Void>>()
public init(_ value: T) {
self.value = value
}
deinit {
for pipe in pipes { pipe.sendCompleted() }
}
}
prefix operator ^ {}
public prefix func ^<T>(rhs: Observable<T>) -> T {
return rhs.value
}
public func ^=<T>(lhs: Observable<T>, rhs: T) {
lhs.value = rhs
}
// MARK: -
// MARK: Foundation Extensions
public extension NSNotificationCenter {
public func signal(name name: String? = nil, object: AnyObject? = nil, queue: NSOperationQueue? = nil) -> SignalProducer<NSNotification, Void> {
return SignalProducer { pipe, disposable in
let observer = self.addObserverForName(name, object: object, queue: queue) { notification in
pipe.sendNext(notification)
}
return Disposable {
self.removeObserver(observer)
disposable?.dispose()
}
}
}
}
private let defaultSessionError = NSError(domain: "Reactive.NSURLSession.signal", code: 1, userInfo: nil)
public extension NSURLSession {
public func signal(URL: NSURL) -> SignalProducer<(data: NSData, response: NSURLResponse), NSError> {
return signal(request: NSURLRequest(URL: URL))
}
public func signal(request request: NSURLRequest) -> SignalProducer<(data: NSData, response: NSURLResponse), NSError> {
return SignalProducer { pipe, disposable in
let task = self.dataTaskWithRequest(request) { data, response, error in
if let data = data, response = response {
pipe.sendNext((data: data, response: response))
pipe.sendCompleted()
} else {
pipe.sendError(error ?? defaultSessionError)
}
}
task.resume()
return Disposable {
task.cancel()
disposable?.dispose()
}
}
}
}
// MARK: -
// MARK: Sample Usage
let request = NSURLRequest(URL: NSURL(string: "http://www.apple.com")!)
NSURLSession.sharedSession().signal(request: request)
.retry(5, interval: .Constant(15)) // retry fetch 5 times every 15 sec
.filter { result in result.data.length > 5000 } // skip if data is less than 5k
.startObserving(
next: { result in log("got \(result.data.length) bytes") },
error: { error in log("got \(error)") },
completed: { log("got completed") }
)//?.dispose() // dispose will cancel the lot if desired
SignalProducer<String,Void>.array(["foo", "bar", "baz", "buq"])
.throttle(0.1) // throttle results every 0.1sec.
.start() {
$0 .next { log("next \($0)") }
.completed { log("completed") }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment