Skip to content

Instantly share code, notes, and snippets.

@markmals
Last active November 12, 2024 16:46
Show Gist options
  • Save markmals/e880043a5f59436b2cc581f9692e6fd6 to your computer and use it in GitHub Desktop.
Save markmals/e880043a5f59436b2cc581f9692e6fd6 to your computer and use it in GitHub Desktop.
Vanilla Reactive System
// Credit Ryan Carniato: https://frontendmasters.com/courses/reactivity-solidjs/
// & Marc Grabanski: https://gist.github.com/1Marc/09e739caa6a82cc176ab4c2abd691814
extension Array<Observer> {
fileprivate var current: Observer? {
last
}
}
private final class Observer {
// FIXME: This is not thread safe
// This is usually implemented as a thread local value in multi-threaded languages:
// - Example in Swift: https://github.com/unixzii/swift-signal/blob/main/Sources/SwiftSignal/Scope.swift#L8
// - Example in Rust: https://github.com/leptos-rs/leptos/blob/main/reactive_graph/src/graph/subscriber.rs#L5-L7
static var context: [Observer] = []
private var sideEffect: (() -> Void) = {}
private var dependencies = Set<ReferenceSet<Observer>>()
func registerEffect(_ executor: @escaping () -> Void) {
sideEffect = executor
}
func execute() {
// start a fresh run to capture new dependencies and remove stale dependencies
cleanup()
// Add ourselves to the context so any signals about to run can associate themselves with us
Self.context.append(self)
// Run the side effect and capture dependencies
sideEffect()
// Remove ourselves from the context
Self.context.removeLast()
}
func observe<T>(_ signal: Signal<T>) {
signal.subscriptions.insert(self)
dependencies.insert(signal.subscriptions)
}
private func cleanup() {
for dependency in dependencies {
dependency.remove(self)
}
dependencies.removeAll()
}
}
extension Observer: Hashable {
static func == (lhs: Observer, rhs: Observer) -> Bool {
lhs === rhs
}
func hash(into hasher: inout Hasher) {
hasher.combine(ObjectIdentifier(self))
}
}
// Reactive atomic value
// Reading from `value` inside of an `effect` will cause the effect to re-run when `value` is updated
public final class Signal<T> {
fileprivate var subscriptions = ReferenceSet<Observer>()
private var storage: T
public var value: T {
get {
// Get the current Observer from the context and start observing ourself
Observer.context.current?.observe(self)
return storage
}
set {
storage = newValue
for observer in subscriptions {
// Run the side effect for all of the Observers observing us
observer.execute()
}
}
}
public init(_ initialValue: T) {
storage = initialValue
}
}
// Reactive side-effect
// Any signals read inside of `sideEffectFunction` will be observed and `sideEffectFunction`
// will be re-run when any observed signals update
public func effect(_ sideEffectFunction: @escaping () -> Void) {
let observer = Observer()
observer.registerEffect(sideEffectFunction)
// Effects must run immediately to capture their dependencies
observer.execute()
}
// Returns a signal that only updates when signals used inside `cachedExpression` update
public func memoize<T>(_ cachedExpression: @escaping @autoclosure () -> T) -> (() -> T) {
let memoizedSignal = Signal<T?>(nil)
// Only update `memoizedSignal` when signals used inside `cachedExpression` change
effect { memoizedSignal.value = cachedExpression() }
// return a read-only signal (a closure)
return { memoizedSignal.value! }
}
// Reads the value returned by `nonReactiveReadsFn` without tracking any signals
// used inside `nonReactiveReadsFn`
public func untrack<T>(_ nonReactiveReadsFn: () -> T) -> T {
let prevContext = Observer.context
Observer.context = []
let res = nonReactiveReadsFn()
Observer.context = prevContext
return res
}
// MARK: Usage Examples
let count = Signal(0)
let doubleCount = memoize(count.value * 2)
effect {
print("2 × \(count.value) is \(doubleCount())")
}
for _ in 0...4 {
try? await Task.sleep(.seconds(2))
count.value = count.value + 1
}
// Prints:
// 2 × 0 is 0
// 2 × 1 is 2
// 2 × 2 is 4
// 2 × 3 is 6
// 2 × 4 is 8
// A reference type Set
private final class ReferenceSet<Element: Hashable>: Hashable, Collection {
typealias Element = Element
typealias Iterator = Set<Element>.Iterator
typealias Index = Set<Element>.Index
typealias Indices = Set<Element>.Indices
typealias SubSequence = Set<Element>.SubSequence
private var inner = Set<Element>()
func makeIterator() -> Iterator {
inner.makeIterator()
}
var startIndex: Index {
inner.startIndex
}
var endIndex: Index {
inner.endIndex
}
var indices: Indices {
inner.indices
}
func index(after i: Index) -> Index {
inner.index(after: i)
}
subscript(position: Index) -> Element {
inner[position]
}
subscript(bounds: Range<Index>) -> SubSequence {
inner[bounds]
}
func insert(_ newMember: Element) {
inner.insert(newMember)
}
func remove(_ member: Element) {
inner.remove(member)
}
static func == (lhs: ReferenceSet, rhs: ReferenceSet) -> Bool {
lhs.inner == rhs.inner
}
func hash(into hasher: inout Hasher) {
inner.hash(into: &hasher)
}
}
@mattmassicotte
Copy link

I ran this code unmodified (except for adding the for: label to the sleep) with Swift 5, and got this output:

2 × 0 is 0
2 × 1 is 0
2 × 1 is 2
2 × 2 is 2
2 × 2 is 4
2 × 3 is 4
2 × 3 is 6
2 × 4 is 6
2 × 4 is 8
2 × 5 is 8
2 × 5 is 10

Seems like there may be a bug? But regardless! I was able to modify this code to get it building with Swift 6. I didn't have to touch the ReferenceSet type.

What I had to do was internalize the global state into a single, unchecked type with a lock. Luckily, there was really no concurrency being used, and that's important because my locking technique will probably only work as long as Observer and Signal have no async methods. I used a lock from Foundation because it was easy. Swift 6 now includes a Mutex type that is cross-platform and nicer, but raises the minimum OS requirements and might not be what you want.

import Foundation

private final class ObserverContext: @unchecked Sendable {
    private var context: [Observer] = []
    private let lock = NSLock()

    func append(_ observer: Observer) {
        lock.withLock {
            context.append(observer)
        }
    }

    func removeLast() {
        lock.withLock {
            _ = context.removeLast()
        }
    }

    func withCurrent(_ block: (Observer?) -> Void) {
        lock.withLock {
            let last = context.last

            block(last)
        }
    }

    public func untrack<T>(_ nonReactiveReadsFn: () -> T) -> T {
        lock.withLock {
            let prevContext = context
            self.context = []
            let res = nonReactiveReadsFn()
            self.context = prevContext

            return res
        }
    }
}

private final class Observer {
    static let context = ObserverContext()

    private var sideEffect: (() -> Void) = {}
    private var dependencies = Set<ReferenceSet<Observer>>()

    func registerEffect(_ executor: @escaping () -> Void) {
        sideEffect = executor
    }

    func execute() {
        // start a fresh run to capture new dependencies and remove stale dependencies
        cleanup()
        // Add ourselves to the context so any signals about to run can associate themselves with us
        Self.context.append(self)
        // Run the side effect and capture dependencies
        sideEffect()
        // Remove ourselves from the context
        Self.context.removeLast()
    }

    func observe<T>(_ signal: Signal<T>) {
        signal.subscriptions.insert(self)
        dependencies.insert(signal.subscriptions)
    }

    private func cleanup() {
        for dependency in dependencies {
            dependency.remove(self)
        }
        dependencies.removeAll()
    }
}

extension Observer: Hashable {
    static func == (lhs: Observer, rhs: Observer) -> Bool {
        lhs === rhs
    }

    func hash(into hasher: inout Hasher) {
        hasher.combine(ObjectIdentifier(self))
    }
}

// Reactive atomic value
// Reading from `value` inside of an `effect` will cause the effect to re-run when `value` is updated
public final class Signal<T> {
    fileprivate var subscriptions = ReferenceSet<Observer>()
    private var storage: T

    public var value: T {
        get {
            // Get the current Observer from the context and start observing ourself
            Observer.context.withCurrent {
                $0?.observe(self)
            }

            return storage
        }
        set {
            storage = newValue
            for observer in subscriptions {
                // Run the side effect for all of the Observers observing us
                observer.execute()
            }
        }
    }

    public init(_ initialValue: T) {
        storage = initialValue
    }
}

// Reactive side-effect
// Any signals read inside of `sideEffectFunction` will be observed and `sideEffectFunction`
// will be re-run when any observed signals update
public func effect(_ sideEffectFunction: @escaping () -> Void) {
    let observer = Observer()
    observer.registerEffect(sideEffectFunction)
    // Effects must run immediately to capture their dependencies
    observer.execute()
}

// Returns a signal that only updates when signals used inside `cachedExpression` update
public func memoize<T>(_ cachedExpression: @escaping @autoclosure () -> T) -> (() -> T) {
    let memoizedSignal = Signal<T?>(nil)
    // Only update `memoizedSignal` when signals used inside `cachedExpression` change
    effect { memoizedSignal.value = cachedExpression() }
    // return a read-only signal (a closure)
    return { memoizedSignal.value! }
}

// Reads the value returned by `nonReactiveReadsFn` without tracking any signals
// used inside `nonReactiveReadsFn`
public func untrack<T>(_ nonReactiveReadsFn: () -> T) -> T {
    Observer.context.untrack(nonReactiveReadsFn)
}

// MARK: Usage Examples

let count = Signal(0)
let doubleCount = memoize(count.value * 2)

effect {
    print("2 × \(count.value) is \(doubleCount())")
}

for _ in 0...4 {
    try? await Task.sleep(for: .seconds(2))
    count.value = count.value + 1
}

@mattmassicotte
Copy link

mattmassicotte commented Aug 10, 2024

Wait wait wait. I just realized that the "last" observer is important, but now the state is global. There are no data races, but is it right? I think the execute method needs to be iterated on more to ensure this cannot be used incorrectly.

Basically, ObserverContext cannot have independent append and removeLast methods, it needs one single atomic operation that does both.

@mattmassicotte
Copy link

Ok second try. This ObserverContext class now cannot be used in that incorrect way. I also had to move to a recursive lock, because I wasn't sure if a side effect could itself result in observeration changes.

import Foundation

private final class ObserverContext: @unchecked Sendable {
    private var context: [Observer] = []
    private let lock = NSRecursiveLock()

    func setCurrent(_ observer: Observer, _ block: () -> Void) {
        lock.withLock {
            context.append(observer)

            block()

            _ = context.removeLast()
        }
    }

    func withCurrent(_ block: (Observer?) -> Void) {
        lock.withLock {
            let last = context.last

            block(last)
        }
    }

    public func untrack<T>(_ nonReactiveReadsFn: () -> T) -> T {
        lock.withLock {
            let prevContext = context
            self.context = []
            let res = nonReactiveReadsFn()
            self.context = prevContext

            return res
        }
    }
}

private final class Observer {
    static let context = ObserverContext()

    private var sideEffect: (() -> Void) = {}
    private var dependencies = Set<ReferenceSet<Observer>>()

    func registerEffect(_ executor: @escaping () -> Void) {
        sideEffect = executor
    }

    func execute() {
        // start a fresh run to capture new dependencies and remove stale dependencies
        cleanup()
        // Add ourselves to the context so any signals about to run can associate themselves with us
        Self.context.setCurrent(self) {
            // Run the side effect and capture dependencies
            sideEffect()
        }
    }

    func observe<T>(_ signal: Signal<T>) {
        signal.subscriptions.insert(self)
        dependencies.insert(signal.subscriptions)
    }

    private func cleanup() {
        for dependency in dependencies {
            dependency.remove(self)
        }
        dependencies.removeAll()
    }
}

extension Observer: Hashable {
    static func == (lhs: Observer, rhs: Observer) -> Bool {
        lhs === rhs
    }

    func hash(into hasher: inout Hasher) {
        hasher.combine(ObjectIdentifier(self))
    }
}

// Reactive atomic value
// Reading from `value` inside of an `effect` will cause the effect to re-run when `value` is updated
public final class Signal<T> {
    fileprivate var subscriptions = ReferenceSet<Observer>()
    private var storage: T

    public var value: T {
        get {
            // Get the current Observer from the context and start observing ourself
            Observer.context.withCurrent {
                $0?.observe(self)
            }

            return storage
        }
        set {
            storage = newValue
            for observer in subscriptions {
                // Run the side effect for all of the Observers observing us
                observer.execute()
            }
        }
    }

    public init(_ initialValue: T) {
        storage = initialValue
    }
}

// Reactive side-effect
// Any signals read inside of `sideEffectFunction` will be observed and `sideEffectFunction`
// will be re-run when any observed signals update
public func effect(_ sideEffectFunction: @escaping () -> Void) {
    let observer = Observer()
    observer.registerEffect(sideEffectFunction)
    // Effects must run immediately to capture their dependencies
    observer.execute()
}

// Returns a signal that only updates when signals used inside `cachedExpression` update
public func memoize<T>(_ cachedExpression: @escaping @autoclosure () -> T) -> (() -> T) {
    let memoizedSignal = Signal<T?>(nil)
    // Only update `memoizedSignal` when signals used inside `cachedExpression` change
    effect { memoizedSignal.value = cachedExpression() }
    // return a read-only signal (a closure)
    return { memoizedSignal.value! }
}

// Reads the value returned by `nonReactiveReadsFn` without tracking any signals
// used inside `nonReactiveReadsFn`
public func untrack<T>(_ nonReactiveReadsFn: () -> T) -> T {
    Observer.context.untrack(nonReactiveReadsFn)
}

// MARK: Usage Examples

let count = Signal(0)
let doubleCount = memoize(count.value * 2)

effect {
    print("2 × \(count.value) is \(doubleCount())")
}

for _ in 0...4 {
    try? await Task.sleep(for: .seconds(2))
    count.value = count.value + 1
}

@markmals
Copy link
Author

markmals commented Aug 10, 2024

@mattmassicotte Okay I went down a bit of a Sendable rabbit hole with ThreadLocal and got this down to two errors with what I think is a better example that more accurately demonstrates possible multi-threaded usage. I feel like I'm probably over-using Mutex, but I wasn't sure how else to ensure sendability of everything without it. I could switch back to NSLock and @unchecked Sendable, but I would kind of prefer to use constructs that allow the compiler to check everything, if possible.

Update: Got it down to one error now, just the Mutex not being able to be stored in a Set because it's not Hashable.

import Synchronization
import Foundation

struct ThreadLocal<Value>: Sendable where Value: Sendable {
    private var threadDictionary: NSMutableDictionary {
        Thread.current.threadDictionary
    }
    
    private let key: String
    
    init(_ wrappedValue: Value?) {
        let box = Box(wrappedValue)
        self.key = "\(ObjectIdentifier(box))"
        
        guard let wrappedValue else {
            // Is this necessary in the init too?
            threadDictionary.removeObject(forKey: self.key)
            return
        }

        guard let threadBox = threadDictionary.object(forKey: self.key) as? Box<Value> else {
            threadDictionary.setObject(box, forKey: NSString(string: self.key))
            return
        }
        
        threadBox.wrappedValue = wrappedValue
    }
    
    var wrappedValue: Value? {
        get {
            (threadDictionary.object(forKey: key) as? Box<Value>)?.wrappedValue
        }
        nonmutating set {
            guard let newValue else {
                threadDictionary.removeObject(forKey: key)
                return
            }
            
            guard let box = threadDictionary.object(forKey: key) as? Box<Value> else {
                threadDictionary.setObject(Box(newValue), forKey: NSString(string: key))
                return
            }
            
            box.wrappedValue = newValue
        }
    }
}

extension ThreadLocal {
    private class Box<Wrapped> {
        var wrappedValue: Wrapped
        
        init(_ wrappedValue: Wrapped) {
            self.wrappedValue = wrappedValue
        }
    }
}

extension Array<Observer> {
    fileprivate var current: Observer? {
        last
    }
}

// One Observer could possibly be referenced on multiple threads, because... [1]
private final class Observer: Sendable {
    // One context per thread
    static let context = ThreadLocal<[Observer]>([])

    private let sideEffect: Mutex<(() -> Void)>
    // FIXME: Mutex doesn't conform to Hashable... :(
    // Should I just use an NSLock here instead? I already have to use Foundation for ThreadLocal
    //
    // Swift Compiler Error: Type 'Mutex<Set<Observer>>' does not conform to protocol 'Hashable'
    private let dependencies = Mutex(Set<Mutex<Set<Observer>>>())
    
    init(_ effect: @Sendable @escaping () -> Void) {
        self.sideEffect = Mutex(effect)
    }

    func execute() {
        sideEffect.withLock { effect in
            guard var ctx = Self.context.wrappedValue else { return }
            cleanup()
            ctx.append(self)
            effect()
            ctx.removeLast()
        }
    }

    func observe<T>(_ signal: Signal<T>) {
        dependencies.withLock {
            signal.subscriptions.withLock { $0.insert(self) }
            $0.insert(signal.subscriptions)
        }
    }

    private func cleanup() {
        dependencies.withLock {
            for dependency in $0 {
                dependency.remove(self)
            }
            $0.removeAll()
        }
    }
}

extension Observer: Hashable {
    static func == (lhs: Observer, rhs: Observer) -> Bool {
        lhs === rhs
    }

    func hash(into hasher: inout Hasher) {
        hasher.combine(ObjectIdentifier(self))
    }
}

public final class Signal<T>: Sendable where T: Sendable {
    // [1] ...a Signal can be referenced on multiple threads, and a Signal contains a reference to its Observers
    fileprivate let subscriptions = Mutex(Set<Observer>())
    private let storage: Mutex<T>

    public var value: T {
        get {
            storage.withLock { lockedStorage in
                Observer.context.wrappedValue?.current?.observe(self)
                return lockedStorage
            }
        }
        set {
            storage.withLock { lockedStorage in
                lockedStorage = newValue
                subscriptions.withLock { lockedSubscriptions in
                    for observer in lockedSubscriptions {
                        observer.execute()
                    }
                }
            }
        }
    }

    public init(_ initialValue: T) {
        storage = Mutex(initialValue)
    }
}

public func effect(_ sideEffectFunction: @Sendable @escaping () -> Void) {
    let observer = Observer(sideEffectFunction)
    observer.execute()
}

public typealias Accessor<T> = @Sendable () -> T

public func memoize<T: Sendable>(_ cachedExpression: @Sendable @escaping @autoclosure () -> T) -> Accessor<T> {
    let memoizedSignal = Signal<T?>(nil)
    effect { memoizedSignal.value = cachedExpression() }
    return { memoizedSignal.value! }
}

public func untrack<T>(_ nonReactiveReadsFn: Accessor<T>) -> T {
    let prevContext = Observer.context
    Observer.context.wrappedValue = []
    let result = nonReactiveReadsFn()
    Observer.context.wrappedValue = prevContext.wrappedValue
    return result
}

// MARK: Usage Example

func someExpensiveCalculation(_ number: Int) -> Int {
    // Pretend...
    number * number
}

let count = Signal(0)
let doubleCount = memoize(count.value * 2)

Task {
    // Do expensive work on a background thread/thread pool/task
    let doubleCountSquared = memoize(someExpensiveCalculation(doubleCount()))
    
    effect {
        // Some sort of non-UI logging code that doesn't need to happen on the main thread
        // and can run in parallel to the UI effect
        print("doubleCountSquared changed: \(doubleCountSquared())")
    }
    
    Task { @MainActor in
        // Receive updates from the background thread on the main thread
        // to update the UI
        effect {
            print("Render UI: \(doubleCountSquared())")
        }
    }
}

// Pretend this is us responding to user input on the main thread
Task { @MainActor in
    for _ in 0..<4 {
        try? await Task.sleep(for: .seconds(2))
        count.value += 1
    }
}

@mattmassicotte
Copy link

It's getting harder for me to offer good help here. I don't know how the system is supposed to behave! But, I still am extremely skeptical of using a thread-local unless I really understand why it is necessary.

Another problem is you have structured your use of Task here like how GCD queues might be used. But that is not how isolation works. Top-level swift code is isolated to the MainActor. So, your first Task is not actually in the background. And your other Tasks would all be MainActor even without your annotations.

Finally, doesn't this program terminate before finishing? There's nothing waiting on these tasks.

@markmals
Copy link
Author

markmals commented Aug 11, 2024

Ah I didn't realize that the top level was already MainActor isolated. In that case, would something like this do what I'm expecting?

// MARK: Usage Example

func someExpensiveCalculation(_ number: Int) -> Int {
    // Pretend...
    number * number
}

let count = Signal(0)
let doubleCount = memoize(count.value * 2)

Task(priority: .background) {
    // Do expensive work on a background thread/thread pool/task
    let doubleCountSquared = memoize(someExpensiveCalculation(doubleCount()))
    
    effect {
        // Some sort of non-UI logging code that doesn't need to happen on the main thread
        // and can run in parallel to the UI effect
        print("doubleCountSquared changed: \(doubleCountSquared())")
    }
    
    Task { @MainActor in
        // Receive updates from the background thread on the main thread
        // to update the UI
        effect {
            print("Render UI: \(doubleCountSquared())")
        }
    }
}

// Pretend this is us responding to user input
for _ in 0..<4 {
    try? await Task.sleep(for: .seconds(2))
    count.value += 1
}

Or do I really just want a nonisolated async function instead of a task at all?

// MARK: Usage Example

func someExpensiveCalculation(_ number: Int) -> Int {
    // Pretend...
    number * number
}

let count = Signal(0)
let doubleCount = memoize(count.value * 2)

nonisolated func background() async {
    // Do expensive work on a background thread/thread pool/task
    let doubleCountSquared = memoize(someExpensiveCalculation(doubleCount()))
    
    effect {
        // Some sort of non-UI logging code that doesn't need to happen on the main thread
        // and can run in parallel to the UI effect
        print("doubleCountSquared changed: \(doubleCountSquared())")
    }
    
    Task { @MainActor in
        // Receive updates from the background thread on the main thread
        // to update the UI
        effect {
            print("Render UI: \(doubleCountSquared())")
        }
    }
}

Task { await background() }

// Pretend this is us responding to user input
for _ in 0..<4 {
    try? await Task.sleep(for: .seconds(2))
    count.value += 1
}

Or a TaskGroup?

// MARK: Usage Example

func someExpensiveCalculation(_ number: Int) -> Int {
    // Pretend...
    number * number
}

let count = Signal(0)
let doubleCount = memoize(count.value * 2)

await withTaskGroup(of: Void.self) { group in
    group.addTask {
        // Do expensive work on a background thread/thread pool/task
        let doubleCountSquared = memoize(someExpensiveCalculation(doubleCount()))
        
        effect {
            // Some sort of non-UI logging code that doesn't need to happen on the main thread
            // and can run in parallel to the UI effect
            print("doubleCountSquared changed: \(doubleCountSquared())")
        }
        
        group.addTask { @MainActor in
            // Receive updates from the background thread on the main thread
            // to update the UI
            effect {
                print("Render UI: \(doubleCountSquared())")
            }
        }
    }
    
    // Pretend this is us responding to user input
    for _ in 0..<4 {
        try? await Task.sleep(for: .seconds(2))
        count.value += 1
    }
}

@markmals
Copy link
Author

@mattmassicotte after reading your latest post, I think this is correct?

// MARK: Usage Example

func someExpensiveCalculation(_ number: Int) -> Int {
    // Pretend...
    number * number
}

let count = Signal(0)
let doubleCount = memoize(count.value * 2)

nonisolated func background() async {
    // Do expensive work in the background
    let doubleCountSquared = memoize(someExpensiveCalculation(doubleCount()))
    
    @MainActor
    func updateUI() {
        // Receive updates from the background thread on the main thread
        // to update the UI
        effect {
            print("Render UI: \(doubleCountSquared())")
        }
    }
    
    // Start the background effect
    effect {
        // Some sort of non-UI logging code that doesn't need to happen on the main thread
        // and can run in parallel to the UI effect
        print("doubleCountSquared changed: \(doubleCountSquared())")
    }
    
    // Start the MainActor effect
    await updateUI()
}

// Start the background and MainActor effect without suspending
async let _ = background()

// Pretend this is us responding to user input
for _ in 0..<4 {
    try? await Task.sleep(for: .seconds(2))
    count.value += 1
}

@mattmassicotte
Copy link

This does look good, except for one thing. I actually don't know what happens when you ignore the result of an async let like that! I guess it is equivalent to but I'm not actually sure.

Task {
  await background()
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment