-
-
Save markmals/e880043a5f59436b2cc581f9692e6fd6 to your computer and use it in GitHub Desktop.
// Credit Ryan Carniato: https://frontendmasters.com/courses/reactivity-solidjs/ | |
// & Marc Grabanski: https://gist.github.com/1Marc/09e739caa6a82cc176ab4c2abd691814 | |
extension Array<Observer> { | |
fileprivate var current: Observer? { | |
last | |
} | |
} | |
private final class Observer { | |
// FIXME: This is not thread safe | |
// This is usually implemented as a thread local value in multi-threaded languages: | |
// - Example in Swift: https://github.com/unixzii/swift-signal/blob/main/Sources/SwiftSignal/Scope.swift#L8 | |
// - Example in Rust: https://github.com/leptos-rs/leptos/blob/main/reactive_graph/src/graph/subscriber.rs#L5-L7 | |
static var context: [Observer] = [] | |
private var sideEffect: (() -> Void) = {} | |
private var dependencies = Set<ReferenceSet<Observer>>() | |
func registerEffect(_ executor: @escaping () -> Void) { | |
sideEffect = executor | |
} | |
func execute() { | |
// start a fresh run to capture new dependencies and remove stale dependencies | |
cleanup() | |
// Add ourselves to the context so any signals about to run can associate themselves with us | |
Self.context.append(self) | |
// Run the side effect and capture dependencies | |
sideEffect() | |
// Remove ourselves from the context | |
Self.context.removeLast() | |
} | |
func observe<T>(_ signal: Signal<T>) { | |
signal.subscriptions.insert(self) | |
dependencies.insert(signal.subscriptions) | |
} | |
private func cleanup() { | |
for dependency in dependencies { | |
dependency.remove(self) | |
} | |
dependencies.removeAll() | |
} | |
} | |
extension Observer: Hashable { | |
static func == (lhs: Observer, rhs: Observer) -> Bool { | |
lhs === rhs | |
} | |
func hash(into hasher: inout Hasher) { | |
hasher.combine(ObjectIdentifier(self)) | |
} | |
} | |
// Reactive atomic value | |
// Reading from `value` inside of an `effect` will cause the effect to re-run when `value` is updated | |
public final class Signal<T> { | |
fileprivate var subscriptions = ReferenceSet<Observer>() | |
private var storage: T | |
public var value: T { | |
get { | |
// Get the current Observer from the context and start observing ourself | |
Observer.context.current?.observe(self) | |
return storage | |
} | |
set { | |
storage = newValue | |
for observer in subscriptions { | |
// Run the side effect for all of the Observers observing us | |
observer.execute() | |
} | |
} | |
} | |
public init(_ initialValue: T) { | |
storage = initialValue | |
} | |
} | |
// Reactive side-effect | |
// Any signals read inside of `sideEffectFunction` will be observed and `sideEffectFunction` | |
// will be re-run when any observed signals update | |
public func effect(_ sideEffectFunction: @escaping () -> Void) { | |
let observer = Observer() | |
observer.registerEffect(sideEffectFunction) | |
// Effects must run immediately to capture their dependencies | |
observer.execute() | |
} | |
// Returns a signal that only updates when signals used inside `cachedExpression` update | |
public func memoize<T>(_ cachedExpression: @escaping @autoclosure () -> T) -> (() -> T) { | |
let memoizedSignal = Signal<T?>(nil) | |
// Only update `memoizedSignal` when signals used inside `cachedExpression` change | |
effect { memoizedSignal.value = cachedExpression() } | |
// return a read-only signal (a closure) | |
return { memoizedSignal.value! } | |
} | |
// Reads the value returned by `nonReactiveReadsFn` without tracking any signals | |
// used inside `nonReactiveReadsFn` | |
public func untrack<T>(_ nonReactiveReadsFn: () -> T) -> T { | |
let prevContext = Observer.context | |
Observer.context = [] | |
let res = nonReactiveReadsFn() | |
Observer.context = prevContext | |
return res | |
} | |
// MARK: Usage Examples | |
let count = Signal(0) | |
let doubleCount = memoize(count.value * 2) | |
effect { | |
print("2 × \(count.value) is \(doubleCount())") | |
} | |
for _ in 0...4 { | |
try? await Task.sleep(.seconds(2)) | |
count.value = count.value + 1 | |
} | |
// Prints: | |
// 2 × 0 is 0 | |
// 2 × 1 is 2 | |
// 2 × 2 is 4 | |
// 2 × 3 is 6 | |
// 2 × 4 is 8 |
// A reference type Set | |
private final class ReferenceSet<Element: Hashable>: Hashable, Collection { | |
typealias Element = Element | |
typealias Iterator = Set<Element>.Iterator | |
typealias Index = Set<Element>.Index | |
typealias Indices = Set<Element>.Indices | |
typealias SubSequence = Set<Element>.SubSequence | |
private var inner = Set<Element>() | |
func makeIterator() -> Iterator { | |
inner.makeIterator() | |
} | |
var startIndex: Index { | |
inner.startIndex | |
} | |
var endIndex: Index { | |
inner.endIndex | |
} | |
var indices: Indices { | |
inner.indices | |
} | |
func index(after i: Index) -> Index { | |
inner.index(after: i) | |
} | |
subscript(position: Index) -> Element { | |
inner[position] | |
} | |
subscript(bounds: Range<Index>) -> SubSequence { | |
inner[bounds] | |
} | |
func insert(_ newMember: Element) { | |
inner.insert(newMember) | |
} | |
func remove(_ member: Element) { | |
inner.remove(member) | |
} | |
static func == (lhs: ReferenceSet, rhs: ReferenceSet) -> Bool { | |
lhs.inner == rhs.inner | |
} | |
func hash(into hasher: inout Hasher) { | |
inner.hash(into: &hasher) | |
} | |
} |
Wait wait wait. I just realized that the "last" observer is important, but now the state is global. There are no data races, but is it right? I think the execute
method needs to be iterated on more to ensure this cannot be used incorrectly.
Basically, ObserverContext
cannot have independent append
and removeLast
methods, it needs one single atomic operation that does both.
Ok second try. This ObserverContext
class now cannot be used in that incorrect way. I also had to move to a recursive lock, because I wasn't sure if a side effect could itself result in observeration changes.
import Foundation
private final class ObserverContext: @unchecked Sendable {
private var context: [Observer] = []
private let lock = NSRecursiveLock()
func setCurrent(_ observer: Observer, _ block: () -> Void) {
lock.withLock {
context.append(observer)
block()
_ = context.removeLast()
}
}
func withCurrent(_ block: (Observer?) -> Void) {
lock.withLock {
let last = context.last
block(last)
}
}
public func untrack<T>(_ nonReactiveReadsFn: () -> T) -> T {
lock.withLock {
let prevContext = context
self.context = []
let res = nonReactiveReadsFn()
self.context = prevContext
return res
}
}
}
private final class Observer {
static let context = ObserverContext()
private var sideEffect: (() -> Void) = {}
private var dependencies = Set<ReferenceSet<Observer>>()
func registerEffect(_ executor: @escaping () -> Void) {
sideEffect = executor
}
func execute() {
// start a fresh run to capture new dependencies and remove stale dependencies
cleanup()
// Add ourselves to the context so any signals about to run can associate themselves with us
Self.context.setCurrent(self) {
// Run the side effect and capture dependencies
sideEffect()
}
}
func observe<T>(_ signal: Signal<T>) {
signal.subscriptions.insert(self)
dependencies.insert(signal.subscriptions)
}
private func cleanup() {
for dependency in dependencies {
dependency.remove(self)
}
dependencies.removeAll()
}
}
extension Observer: Hashable {
static func == (lhs: Observer, rhs: Observer) -> Bool {
lhs === rhs
}
func hash(into hasher: inout Hasher) {
hasher.combine(ObjectIdentifier(self))
}
}
// Reactive atomic value
// Reading from `value` inside of an `effect` will cause the effect to re-run when `value` is updated
public final class Signal<T> {
fileprivate var subscriptions = ReferenceSet<Observer>()
private var storage: T
public var value: T {
get {
// Get the current Observer from the context and start observing ourself
Observer.context.withCurrent {
$0?.observe(self)
}
return storage
}
set {
storage = newValue
for observer in subscriptions {
// Run the side effect for all of the Observers observing us
observer.execute()
}
}
}
public init(_ initialValue: T) {
storage = initialValue
}
}
// Reactive side-effect
// Any signals read inside of `sideEffectFunction` will be observed and `sideEffectFunction`
// will be re-run when any observed signals update
public func effect(_ sideEffectFunction: @escaping () -> Void) {
let observer = Observer()
observer.registerEffect(sideEffectFunction)
// Effects must run immediately to capture their dependencies
observer.execute()
}
// Returns a signal that only updates when signals used inside `cachedExpression` update
public func memoize<T>(_ cachedExpression: @escaping @autoclosure () -> T) -> (() -> T) {
let memoizedSignal = Signal<T?>(nil)
// Only update `memoizedSignal` when signals used inside `cachedExpression` change
effect { memoizedSignal.value = cachedExpression() }
// return a read-only signal (a closure)
return { memoizedSignal.value! }
}
// Reads the value returned by `nonReactiveReadsFn` without tracking any signals
// used inside `nonReactiveReadsFn`
public func untrack<T>(_ nonReactiveReadsFn: () -> T) -> T {
Observer.context.untrack(nonReactiveReadsFn)
}
// MARK: Usage Examples
let count = Signal(0)
let doubleCount = memoize(count.value * 2)
effect {
print("2 × \(count.value) is \(doubleCount())")
}
for _ in 0...4 {
try? await Task.sleep(for: .seconds(2))
count.value = count.value + 1
}
@mattmassicotte Okay I went down a bit of a Sendable rabbit hole with ThreadLocal
and got this down to two errors with what I think is a better example that more accurately demonstrates possible multi-threaded usage. I feel like I'm probably over-using Mutex
, but I wasn't sure how else to ensure sendability of everything without it. I could switch back to NSLock
and @unchecked Sendable
, but I would kind of prefer to use constructs that allow the compiler to check everything, if possible.
Update: Got it down to one error now, just the Mutex
not being able to be stored in a Set
because it's not Hashable
.
import Synchronization
import Foundation
struct ThreadLocal<Value>: Sendable where Value: Sendable {
private var threadDictionary: NSMutableDictionary {
Thread.current.threadDictionary
}
private let key: String
init(_ wrappedValue: Value?) {
let box = Box(wrappedValue)
self.key = "\(ObjectIdentifier(box))"
guard let wrappedValue else {
// Is this necessary in the init too?
threadDictionary.removeObject(forKey: self.key)
return
}
guard let threadBox = threadDictionary.object(forKey: self.key) as? Box<Value> else {
threadDictionary.setObject(box, forKey: NSString(string: self.key))
return
}
threadBox.wrappedValue = wrappedValue
}
var wrappedValue: Value? {
get {
(threadDictionary.object(forKey: key) as? Box<Value>)?.wrappedValue
}
nonmutating set {
guard let newValue else {
threadDictionary.removeObject(forKey: key)
return
}
guard let box = threadDictionary.object(forKey: key) as? Box<Value> else {
threadDictionary.setObject(Box(newValue), forKey: NSString(string: key))
return
}
box.wrappedValue = newValue
}
}
}
extension ThreadLocal {
private class Box<Wrapped> {
var wrappedValue: Wrapped
init(_ wrappedValue: Wrapped) {
self.wrappedValue = wrappedValue
}
}
}
extension Array<Observer> {
fileprivate var current: Observer? {
last
}
}
// One Observer could possibly be referenced on multiple threads, because... [1]
private final class Observer: Sendable {
// One context per thread
static let context = ThreadLocal<[Observer]>([])
private let sideEffect: Mutex<(() -> Void)>
// FIXME: Mutex doesn't conform to Hashable... :(
// Should I just use an NSLock here instead? I already have to use Foundation for ThreadLocal
//
// Swift Compiler Error: Type 'Mutex<Set<Observer>>' does not conform to protocol 'Hashable'
private let dependencies = Mutex(Set<Mutex<Set<Observer>>>())
init(_ effect: @Sendable @escaping () -> Void) {
self.sideEffect = Mutex(effect)
}
func execute() {
sideEffect.withLock { effect in
guard var ctx = Self.context.wrappedValue else { return }
cleanup()
ctx.append(self)
effect()
ctx.removeLast()
}
}
func observe<T>(_ signal: Signal<T>) {
dependencies.withLock {
signal.subscriptions.withLock { $0.insert(self) }
$0.insert(signal.subscriptions)
}
}
private func cleanup() {
dependencies.withLock {
for dependency in $0 {
dependency.remove(self)
}
$0.removeAll()
}
}
}
extension Observer: Hashable {
static func == (lhs: Observer, rhs: Observer) -> Bool {
lhs === rhs
}
func hash(into hasher: inout Hasher) {
hasher.combine(ObjectIdentifier(self))
}
}
public final class Signal<T>: Sendable where T: Sendable {
// [1] ...a Signal can be referenced on multiple threads, and a Signal contains a reference to its Observers
fileprivate let subscriptions = Mutex(Set<Observer>())
private let storage: Mutex<T>
public var value: T {
get {
storage.withLock { lockedStorage in
Observer.context.wrappedValue?.current?.observe(self)
return lockedStorage
}
}
set {
storage.withLock { lockedStorage in
lockedStorage = newValue
subscriptions.withLock { lockedSubscriptions in
for observer in lockedSubscriptions {
observer.execute()
}
}
}
}
}
public init(_ initialValue: T) {
storage = Mutex(initialValue)
}
}
public func effect(_ sideEffectFunction: @Sendable @escaping () -> Void) {
let observer = Observer(sideEffectFunction)
observer.execute()
}
public typealias Accessor<T> = @Sendable () -> T
public func memoize<T: Sendable>(_ cachedExpression: @Sendable @escaping @autoclosure () -> T) -> Accessor<T> {
let memoizedSignal = Signal<T?>(nil)
effect { memoizedSignal.value = cachedExpression() }
return { memoizedSignal.value! }
}
public func untrack<T>(_ nonReactiveReadsFn: Accessor<T>) -> T {
let prevContext = Observer.context
Observer.context.wrappedValue = []
let result = nonReactiveReadsFn()
Observer.context.wrappedValue = prevContext.wrappedValue
return result
}
// MARK: Usage Example
func someExpensiveCalculation(_ number: Int) -> Int {
// Pretend...
number * number
}
let count = Signal(0)
let doubleCount = memoize(count.value * 2)
Task {
// Do expensive work on a background thread/thread pool/task
let doubleCountSquared = memoize(someExpensiveCalculation(doubleCount()))
effect {
// Some sort of non-UI logging code that doesn't need to happen on the main thread
// and can run in parallel to the UI effect
print("doubleCountSquared changed: \(doubleCountSquared())")
}
Task { @MainActor in
// Receive updates from the background thread on the main thread
// to update the UI
effect {
print("Render UI: \(doubleCountSquared())")
}
}
}
// Pretend this is us responding to user input on the main thread
Task { @MainActor in
for _ in 0..<4 {
try? await Task.sleep(for: .seconds(2))
count.value += 1
}
}
It's getting harder for me to offer good help here. I don't know how the system is supposed to behave! But, I still am extremely skeptical of using a thread-local unless I really understand why it is necessary.
Another problem is you have structured your use of Task here like how GCD queues might be used. But that is not how isolation works. Top-level swift code is isolated to the MainActor. So, your first Task is not actually in the background. And your other Tasks would all be MainActor even without your annotations.
Finally, doesn't this program terminate before finishing? There's nothing waiting on these tasks.
Ah I didn't realize that the top level was already MainActor isolated. In that case, would something like this do what I'm expecting?
// MARK: Usage Example
func someExpensiveCalculation(_ number: Int) -> Int {
// Pretend...
number * number
}
let count = Signal(0)
let doubleCount = memoize(count.value * 2)
Task(priority: .background) {
// Do expensive work on a background thread/thread pool/task
let doubleCountSquared = memoize(someExpensiveCalculation(doubleCount()))
effect {
// Some sort of non-UI logging code that doesn't need to happen on the main thread
// and can run in parallel to the UI effect
print("doubleCountSquared changed: \(doubleCountSquared())")
}
Task { @MainActor in
// Receive updates from the background thread on the main thread
// to update the UI
effect {
print("Render UI: \(doubleCountSquared())")
}
}
}
// Pretend this is us responding to user input
for _ in 0..<4 {
try? await Task.sleep(for: .seconds(2))
count.value += 1
}
Or do I really just want a nonisolated async function instead of a task at all?
// MARK: Usage Example
func someExpensiveCalculation(_ number: Int) -> Int {
// Pretend...
number * number
}
let count = Signal(0)
let doubleCount = memoize(count.value * 2)
nonisolated func background() async {
// Do expensive work on a background thread/thread pool/task
let doubleCountSquared = memoize(someExpensiveCalculation(doubleCount()))
effect {
// Some sort of non-UI logging code that doesn't need to happen on the main thread
// and can run in parallel to the UI effect
print("doubleCountSquared changed: \(doubleCountSquared())")
}
Task { @MainActor in
// Receive updates from the background thread on the main thread
// to update the UI
effect {
print("Render UI: \(doubleCountSquared())")
}
}
}
Task { await background() }
// Pretend this is us responding to user input
for _ in 0..<4 {
try? await Task.sleep(for: .seconds(2))
count.value += 1
}
Or a TaskGroup?
// MARK: Usage Example
func someExpensiveCalculation(_ number: Int) -> Int {
// Pretend...
number * number
}
let count = Signal(0)
let doubleCount = memoize(count.value * 2)
await withTaskGroup(of: Void.self) { group in
group.addTask {
// Do expensive work on a background thread/thread pool/task
let doubleCountSquared = memoize(someExpensiveCalculation(doubleCount()))
effect {
// Some sort of non-UI logging code that doesn't need to happen on the main thread
// and can run in parallel to the UI effect
print("doubleCountSquared changed: \(doubleCountSquared())")
}
group.addTask { @MainActor in
// Receive updates from the background thread on the main thread
// to update the UI
effect {
print("Render UI: \(doubleCountSquared())")
}
}
}
// Pretend this is us responding to user input
for _ in 0..<4 {
try? await Task.sleep(for: .seconds(2))
count.value += 1
}
}
@mattmassicotte after reading your latest post, I think this is correct?
// MARK: Usage Example
func someExpensiveCalculation(_ number: Int) -> Int {
// Pretend...
number * number
}
let count = Signal(0)
let doubleCount = memoize(count.value * 2)
nonisolated func background() async {
// Do expensive work in the background
let doubleCountSquared = memoize(someExpensiveCalculation(doubleCount()))
@MainActor
func updateUI() {
// Receive updates from the background thread on the main thread
// to update the UI
effect {
print("Render UI: \(doubleCountSquared())")
}
}
// Start the background effect
effect {
// Some sort of non-UI logging code that doesn't need to happen on the main thread
// and can run in parallel to the UI effect
print("doubleCountSquared changed: \(doubleCountSquared())")
}
// Start the MainActor effect
await updateUI()
}
// Start the background and MainActor effect without suspending
async let _ = background()
// Pretend this is us responding to user input
for _ in 0..<4 {
try? await Task.sleep(for: .seconds(2))
count.value += 1
}
This does look good, except for one thing. I actually don't know what happens when you ignore the result of an async let
like that! I guess it is equivalent to but I'm not actually sure.
Task {
await background()
}
I ran this code unmodified (except for adding the
for:
label to the sleep) with Swift 5, and got this output:Seems like there may be a bug? But regardless! I was able to modify this code to get it building with Swift 6. I didn't have to touch the
ReferenceSet
type.What I had to do was internalize the global state into a single, unchecked type with a lock. Luckily, there was really no concurrency being used, and that's important because my locking technique will probably only work as long as Observer and Signal have no async methods. I used a lock from Foundation because it was easy. Swift 6 now includes a
Mutex
type that is cross-platform and nicer, but raises the minimum OS requirements and might not be what you want.