Skip to content

Instantly share code, notes, and snippets.

@dfed
Last active April 13, 2023 05:23
Show Gist options
  • Save dfed/2d20b80cbc5fef6c7ecb94f797e8a5de to your computer and use it in GitHub Desktop.
Save dfed/2d20b80cbc5fef6c7ecb94f797e8a5de to your computer and use it in GitHub Desktop.
// MIT License
//
// Copyright (c) 2023 Dan Federman
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
import Dispatch
/// A thread-safe object that enables the propagation of values to observers. Observers are not retained.
/// - Note: This type is `Sendable` because mutable state is bound to a single queue.
public final class Observable<Value: Sendable>: @unchecked Sendable {
// MARK: Initialization
/// Creates an Observable that operates on the input `queue`.
/// - Parameters:
/// - initialValue: The initial value that will be propagated to
/// - queue: The queue on which the instance will operate.
public init(initialValue: Value? = nil, queue: DispatchQueue = .main) {
value = initialValue
context = DispatchContext(queue: queue)
}
// MARK: Public
/// Begins sending values to the observer's function. Values will continue to be sent until the observer deallocates.
/// - Parameters:
/// - function: A reference to a function on the observer type that receives a Value. e.g. `Observer.setValue`.
/// - observingObject: The observing object whose function will be sent values.
/// - queue: The queue on which the observing object's function will be called.
public func streamValues<Observer: AnyObject>(
to functionReference: @escaping (Observer) -> (Value) -> Void,
whileExists observingObject: Observer,
on queue: DispatchQueue = .main)
{
streamValues(to: Propogator(observingObject, functionReference: functionReference, queue: queue))
}
/// Begins sending values to the observer's function. Values will continue to be sent until the observer deallocates.
/// - Parameters:
/// - function: A reference to a function on the observer type that receives a Value. e.g. `Observer.setValue`.
/// - observingObject: The observing object whose function will be sent values.
/// - queue: The queue on which the observing object's function will be called.
public func streamValues<Observer: AnyObject>(
to functionReference: @escaping (Observer) -> (Value?) -> Void,
whileExists observingObject: Observer,
on queue: DispatchQueue = .main)
{
streamValues(to: Propogator(observingObject, functionReference: functionReference, queue: queue))
}
/// Forwards a value to current observers. Value is propogated asynchronously.
/// Value is retained until the next sent value. The most recently sent value will be sent to observers registered in the future.
/// - Parameter value: The value to forward.
public func send(_ value: Value) {
context.execute {
self.value = value
self.compactObservers_inExecutionContext()
for propogator in self.propogators {
propogator.setValue(value)
}
}
}
// MARK: Private
private var value: Value?
private var propogators = [Propogator]()
private let context: DispatchContext
private func streamValues(to propogator: Propogator) {
context.execute {
self.compactObservers_inExecutionContext()
self.propogators.append(propogator)
propogator.setValue(self.value)
}
}
private func compactObservers_inExecutionContext() {
propogators = propogators.filter { $0.isValid }
}
/// Propogates values to a weakly held object via a function reference on a specified queue.
/// - Note: This type is `Sendable` because the only mutable value is a weak reference that is never manual;y mutated.
private final class Propogator: @unchecked Sendable {
init<T: AnyObject>(_ receiver: T, functionReference: @escaping (T) -> (Value) -> Void, queue: DispatchQueue) {
let context = DispatchContext(queue: queue)
valueSetter = { [weak receiver] nextValue in
if let receiver, let nextValue {
context.execute {
functionReference(receiver)(nextValue)
}
}
}
self.context = context
self.receiver = receiver
}
init<T: AnyObject>(_ receiver: T, functionReference: @escaping (T) -> (Value?) -> Void, queue: DispatchQueue) {
let context = DispatchContext(queue: queue)
valueSetter = { [weak receiver] nextValue in
if let receiver {
context.execute {
functionReference(receiver)(nextValue)
}
}
}
self.context = context
self.receiver = receiver
}
func setValue(_ value: Value?) {
valueSetter(value)
}
var isValid: Bool {
receiver != nil
}
private let valueSetter: (Value?) -> Void
private let context: DispatchContext
private weak var receiver: AnyObject?
}
/// An executor of closures.
/// - Note: This type is `Sendable` because the Dispatch properties are never mutated.
private final class DispatchContext: @unchecked Sendable {
init(queue: DispatchQueue) {
self.queue = queue
queue.setSpecific(key: key, value: ())
}
/// Executes the provided closure on the receiver's queue.
/// If the current queue is the receiver's queue, the closure will be executed immediately.
/// - Parameter closure: The closure to execute.
func execute(closure: @escaping () -> Void) {
let closureHolder = UncheckedClosureHolder(closure: closure)
if DispatchQueue.getSpecific(key: key) == nil {
queue.async {
closureHolder.closure()
}
} else {
closure()
}
}
private let queue: DispatchQueue
private let key = DispatchSpecificKey<Void>()
}
/// A type that wraps a non-Sendable closure to avoid a compiler warning.
///
/// We need this type because `DispatchQueue.async` requires a `Sendable` closure as a parameter,
/// and we have no way of making our closures `Sendable`. For example, the `Observable` type utilizes
/// a function reference within a closure, and function references are not `Sendable` since they have no control
/// over when they are executed. We also modify mutable state in our closures – this is not a `Sendable` act!
/// However, we always modify or inspect mutable state on a single queue, meaning that our work is effectively
/// `Sendable`, even if we can't convince the compiler of that.
private struct UncheckedClosureHolder: @unchecked Sendable {
let closure: () -> Void
}
}
import XCTest
@MainActor
final class ObservableTests: XCTestCase {
// MARK: XCTestCase
override func setUp() async throws {
try await super.setUp()
systemUnderTest = Observable()
}
// MARK: Behavior Tests
func test_streamValues_streamsLastSetValue() {
let counter = Counter()
let receiver = ValueReceiver() { value in
XCTAssertEqual(value, 1_000)
counter.incrementAndExpectCount(equals: 1)
}
for iteration in 1...1_000 {
systemUnderTest.send(iteration)
}
systemUnderTest.streamValues(to: ValueReceiver.receiveValue, whileExists: receiver, on: .main)
XCTAssertEqual(counter.count, 1)
}
func test_streamValues_streamsNewValuesInOrder() {
let counter = Counter()
let receiver = ValueReceiver() { value in
counter.incrementAndExpectCount(equals: value)
}
systemUnderTest.streamValues(to: ValueReceiver.receiveValue, whileExists: receiver, on: .main)
for iteration in 1...1_000 {
systemUnderTest.send(iteration)
}
XCTAssertEqual(counter.count, 1_000)
}
func test_streamValues_streamsLastSetValueThenAllNewValuesInOrder() {
let counter = Counter()
let receiver = ValueReceiver() { value in
counter.incrementAndExpectCount(equals: value + 1)
}
systemUnderTest.send(0)
systemUnderTest.streamValues(to: ValueReceiver.receiveValue, whileExists: receiver, on: .main)
for iteration in 1...1_000 {
systemUnderTest.send(iteration)
}
XCTAssertEqual(counter.count, 1_001)
}
func test_streamValues_doesNotStreamAllPastValues() {
let counter = Counter()
let receiver = ValueReceiver() { value in
XCTAssertEqual(value, 1)
counter.incrementAndExpectCount(equals: value)
}
systemUnderTest.send(0)
systemUnderTest.send(1)
systemUnderTest.streamValues(to: ValueReceiver.receiveValue, whileExists: receiver, on: .main)
XCTAssertEqual(counter.count, 1)
}
func test_streamValues_doesNotRetainObserver() {
let counter = Counter()
var receiver: ValueReceiver? = ValueReceiver() { value in
counter.incrementAndExpectCount(equals: value)
}
if let receiver {
systemUnderTest.streamValues(to: ValueReceiver.receiveValue, whileExists: receiver, on: .main)
}
for iteration in 1...1_000 {
systemUnderTest.send(iteration)
if iteration == 500 {
receiver = nil
}
}
XCTAssertEqual(counter.count, 500)
}
func test_streamValues_streamsValuesOnExpectedQueue() {
let key = DispatchSpecificKey<Void>()
let queue = DispatchQueue(label: #function)
queue.setSpecific(key: key, value: ())
let expectation = self.expectation(description: #function)
systemUnderTest = Observable(initialValue: 0)
let receiver = ValueReceiver<Int>() { _ in
XCTAssertNotNil(DispatchQueue.getSpecific(key: key))
expectation.fulfill()
}
systemUnderTest.streamValues(to: ValueReceiver.receiveValue, whileExists: receiver, on: queue)
waitForExpectations(timeout: 1.0)
}
// MARK: Private
private var systemUnderTest: Observable<Int>! = Observable(initialValue: 0, queue: .main)
private final class ValueReceiver<Value> {
init(receiving: @escaping (Value) -> Void) {
self.receiving = receiving
}
func receiveValue(_ value: Value) {
receiving(value)
}
private let receiving: (Value) -> Void
}
private final class Counter {
func incrementAndExpectCount(equals expectedCount: Int, file: StaticString = #filePath, line: UInt = #line) {
increment()
XCTAssertEqual(expectedCount, count, file: file, line: line)
}
func increment() {
count += 1
}
var count = 0
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment