Skip to content

Instantly share code, notes, and snippets.

@dfed
Last active July 30, 2024 12:06
Show Gist options
  • Save dfed/bdeca7b52cb3fca7f9a3e0589ff67557 to your computer and use it in GitHub Desktop.
Save dfed/bdeca7b52cb3fca7f9a3e0589ff67557 to your computer and use it in GitHub Desktop.
// MIT License
//
// Copyright (c) 2023 Dan Federman
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
import Combine
import Foundation
/// A thread-safe object that enables the propagation of values to observers. Observers are not retained.
/// - Note: This type is `Sendable` because mutable state is bound to a single queue.
public final class Observable<Value: Sendable>: @unchecked Sendable {
// MARK: Initialization
/// Creates an Observable.
/// - Parameter initialValue: The initial value that will be propagated to
public init(initialValue: Value? = nil) {
subject = CurrentValueSubject<Value?, Never>(initialValue)
receiver = Receiver(subject: subject)
}
deinit {
subject.send(completion: .finished)
}
// MARK: Public
/// The values sent to the observable. Read-only.
public let receiver: Receiver<Value>
/// Forwards a value to current observers. Value is propagated synchronously.
/// Value is retained until the next sent value. The most recently sent value will be sent to observers registered in the future.
/// - Parameter value: The value to forward.
public func send(_ value: Value) {
subject.send(value)
}
/// Begins sending values to the observer's function. Values will continue to be sent until the observer deallocates.
/// - Parameters:
/// - function: A reference to a function on the observer type that receives a Value. e.g. `Observer.setValue`.
/// - observingObject: The observing object whose function will be sent values.
/// - queue: The queue on which the observing object's function will be called. If `nil`, function calls occur on the sending queue.
public func streamValues<Observer: AnyObject>(
to functionReference: @escaping (Observer) -> (Value) -> Void,
whileExists observingObject: Observer,
on queue: DispatchQueue? = .main)
{
receiver.streamValues(to: functionReference, whileExists: observingObject, on: queue)
}
/// Begins sending values to the observer's function. Values will continue to be sent until the observer deallocates.
/// - Parameters:
/// - function: A reference to a function on the observer type that receives a Value. e.g. `Observer.setValue`.
/// - observingObject: The observing object whose function will be sent values.
/// - queue: The queue on which the observing object's function will be called. If `nil`, function calls occur on the sending queue.
public func streamValues<Observer: AnyObject>(
to functionReference: @escaping (Observer) -> (Value?) -> Void,
whileExists observingObject: Observer,
on queue: DispatchQueue? = .main)
{
receiver.streamValues(to: functionReference, whileExists: observingObject, on: queue)
}
// MARK: Private
// CurrentValueSubject is not Sendable, but since we never use the `value` accessor we can consider ourselves Sendable.
private let subject: CurrentValueSubject<Value?, Never>
}
/// A thread-safe object that enables the receiving of values by observers. Observers are not retained.
/// - Note: This type is `Sendable` because mutable state is bound to a single queue.
public final class Receiver<Value: Sendable>: @unchecked Sendable {
// MARK: Initialization
/// Creates a Receiver.
/// - Parameter subject: The subject whose values will be streamed.
init(subject: CurrentValueSubject<Value?, Never>) {
self.subject = subject
}
// MARK: Public
/// Begins sending values to the observer's function. Values will continue to be sent until the observer deallocates.
/// - Parameters:
/// - function: A reference to a function on the observer type that receives a Value. e.g. `Observer.setValue`.
/// - observingObject: The observing object whose function will be sent values.
/// - queue: The queue on which the observing object's function will be called. If `nil`, function calls occur on the sending queue.
public func streamValues<Observer: AnyObject>(
to functionReference: @escaping (Observer) -> (Value) -> Void,
whileExists observingObject: Observer,
on queue: DispatchQueue? = .main)
{
subject.streamValues(
to: { observingObject in
{ nextValue in
if let nextValue {
functionReference(observingObject)(nextValue)
}
}
},
whileExists: observingObject,
on: queue)
}
/// Begins sending values to the observer's function. Values will continue to be sent until the observer deallocates.
/// - Parameters:
/// - function: A reference to a function on the observer type that receives a Value. e.g. `Observer.setValue`.
/// - observingObject: The observing object whose function will be sent values.
/// - queue: The queue on which the observing object's function will be called. If `nil`, function calls occur on the sending queue.
public func streamValues<Observer: AnyObject>(
to functionReference: @escaping (Observer) -> (Value?) -> Void,
whileExists observingObject: Observer,
on queue: DispatchQueue? = .main)
{
subject.streamValues(to: functionReference, whileExists: observingObject, on: queue)
}
// MARK: Private
private let subject: CurrentValueSubject<Value?, Never>
}
extension CurrentValueSubject {
/// Begins sending values to the observer's function. Values will continue to be sent until the observer deallocates.
/// - Parameters:
/// - function: A reference to a function on the observer type that receives a Value. e.g. `Observer.setValue`.
/// - observingObject: The observing object whose function will be sent values.
/// - queue: The queue on which the observing object's function will be called. If `nil`, function calls occur on the sending queue.
func streamValues<Observer: AnyObject>(
to functionReference: @escaping (Observer) -> (Output) -> Void,
whileExists observingObject: Observer,
on queue: DispatchQueue?)
{
let dispatchContext = DispatchContext(queue: queue)
var cancellation: AnyCancellable?
cancellation = sink(
receiveCompletion: { _ in
cancellation = nil
}, receiveValue: { [weak observingObject] nextValue in
dispatchContext.execute {
guard let observingObject else {
cancellation = nil
return
}
functionReference(observingObject)(nextValue)
}
})
// Silence the warning that `cancellation` is never read.
_ = cancellation
}
/// An executor of closures.
/// - Note: This type is `Sendable` because the Dispatch properties are never mutated.
private final class DispatchContext: @unchecked Sendable {
init(queue: DispatchQueue?) {
self.queue = queue
queue?.setSpecific(key: key, value: ())
}
/// Executes the provided closure on the receiver's `queue`.
/// If the current queue is the receiver's `queue` the closure will be executed immediately.
/// If the reciever's `queue` is `nil` the closure will be executed immediately.
/// - Parameter closure: The closure to execute.
func execute(closure: @escaping () -> Void) {
guard let queue else {
closure()
return
}
let closureHolder = UncheckedClosureHolder(closure: closure)
if DispatchQueue.getSpecific(key: key) == nil {
queue.async {
closureHolder.closure()
}
} else {
closure()
}
}
private let queue: DispatchQueue?
private let key = DispatchSpecificKey<Void>()
}
/// A type that wraps a non-Sendable closure to avoid a compiler warning.
///
/// We need this type because `DispatchQueue.async` requires a `Sendable` closure as a parameter,
/// and we have no way of making our closures `Sendable`. For example, the `Observable` type utilizes
/// a function reference within a closure, and function references are not `Sendable` since they have no control
/// over when they are executed. We also modify mutable state in our closures – this is not a `Sendable` act!
/// However, we always modify or inspect mutable state on a single queue, meaning that our work is effectively
/// `Sendable`, even if we can't convince the compiler of that.
private struct UncheckedClosureHolder: @unchecked Sendable {
let closure: () -> Void
}
}
import XCTest
@MainActor
final class ObservableTests: XCTestCase {
// MARK: XCTestCase
override func setUp() async throws {
try await super.setUp()
systemUnderTest = Observable()
}
// MARK: Behavior Tests
func test_streamValues_streamsLastSetValue() {
let counter = Counter()
let receiver = ValueReceiver() { value in
XCTAssertEqual(value, 1_000)
counter.incrementAndExpectCount(equals: 1)
}
for iteration in 1...1_000 {
systemUnderTest.send(iteration)
}
systemUnderTest.streamValues(to: ValueReceiver.receiveValue, whileExists: receiver)
XCTAssertEqual(counter.count, 1)
}
func test_streamValues_streamsNewValuesInOrder() {
let counter = Counter()
let receiver = ValueReceiver() { value in
counter.incrementAndExpectCount(equals: value)
}
systemUnderTest.streamValues(to: ValueReceiver.receiveValue, whileExists: receiver)
for iteration in 1...1_000 {
systemUnderTest.send(iteration)
}
XCTAssertEqual(counter.count, 1_000)
}
func test_streamValues_streamsLastSetValueThenAllNewValuesInOrder() {
let counter = Counter()
let receiver = ValueReceiver() { value in
counter.incrementAndExpectCount(equals: value + 1)
}
systemUnderTest.send(0)
systemUnderTest.streamValues(to: ValueReceiver.receiveValue, whileExists: receiver)
for iteration in 1...1_000 {
systemUnderTest.send(iteration)
}
XCTAssertEqual(counter.count, 1_001)
}
func test_streamValues_doesNotStreamAllPastValues() {
let counter = Counter()
let receiver = ValueReceiver() { value in
XCTAssertEqual(value, 1)
counter.incrementAndExpectCount(equals: value)
}
systemUnderTest.send(0)
systemUnderTest.send(1)
systemUnderTest.streamValues(to: ValueReceiver.receiveValue, whileExists: receiver)
XCTAssertEqual(counter.count, 1)
}
func test_streamValues_doesNotRetainObserver() {
let counter = Counter()
var receiver: ValueReceiver? = ValueReceiver() { value in
counter.incrementAndExpectCount(equals: value)
}
if let receiver {
systemUnderTest.streamValues(to: ValueReceiver.receiveValue, whileExists: receiver)
}
for iteration in 1...1_000 {
systemUnderTest.send(iteration)
if iteration == 500 {
receiver = nil
}
}
XCTAssertEqual(counter.count, 500)
}
func test_streamValues_streamsValuesOnGivenQueue() {
let key = DispatchSpecificKey<Void>()
let queue = DispatchQueue(label: #function)
queue.setSpecific(key: key, value: ())
let expectation = self.expectation(description: #function)
systemUnderTest = Observable(initialValue: 0)
let receiver = ValueReceiver<Int>() { _ in
XCTAssertNotNil(DispatchQueue.getSpecific(key: key))
expectation.fulfill()
}
systemUnderTest.streamValues(to: ValueReceiver.receiveValue, whileExists: receiver, on: queue)
waitForExpectations(timeout: 1.0)
}
func test_streamValues_streamsValuesOnSenderQueueIfNoQueueProvided() {
let key = DispatchSpecificKey<Void>()
let queue = DispatchQueue(label: #function)
queue.setSpecific(key: key, value: ())
let expectation = self.expectation(description: #function)
let systemUnderTest = Observable<Int>()
let receiver = ValueReceiver<Int>() { _ in
XCTAssertNotNil(DispatchQueue.getSpecific(key: key))
expectation.fulfill()
}
systemUnderTest.streamValues(to: ValueReceiver.receiveValue, whileExists: receiver, on: nil)
queue.async {
systemUnderTest.send(0)
}
waitForExpectations(timeout: 1.0)
}
// MARK: Private
private var systemUnderTest: Observable<Int>! = Observable(initialValue: 0)
private final class ValueReceiver<Value> {
init(receiving: @escaping (Value) -> Void) {
self.receiving = receiving
}
func receiveValue(_ value: Value) {
receiving(value)
}
private let receiving: (Value) -> Void
}
private final class Counter {
func incrementAndExpectCount(equals expectedCount: Int, file: StaticString = #filePath, line: UInt = #line) {
increment()
XCTAssertEqual(expectedCount, count, file: file, line: line)
}
func increment() {
count += 1
}
var count = 0
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment