Skip to content

Instantly share code, notes, and snippets.

@dfed
Last active September 23, 2024 19:28
Show Gist options
  • Save dfed/640292bf1b00c65c8e237730c84e547f to your computer and use it in GitHub Desktop.
Save dfed/640292bf1b00c65c8e237730c84e547f to your computer and use it in GitHub Desktop.
// MIT License
//
// Copyright (c) 2023 Dan Federman
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
@_exported import Combine
import Foundation
// MARK: - Publisher
extension Publisher {
/// Attaches a weakly held subscriber with function-based behavior.
/// The observer is not retained, and values will continue to be sent to the observer until it deallocates.
///
/// - Parameters:
/// - functionReference: A reference to a function on the observer type that receives a Value. e.g. `Observer.setValue`.
/// - observingObject: The observing object whose function will be sent values.
public func sink<Observer: AnyObject>(
to functionReference: @escaping (Observer) -> (Output) -> Void,
whileExists observingObject: Observer)
{
var cancellation: AnyCancellable?
cancellation = sink(
receiveCompletion: { _ in
cancellation = nil
}, receiveValue: { [weak observingObject] nextValue in
guard let observingObject else {
cancellation = nil
return
}
functionReference(observingObject)(nextValue)
})
// Silence the warning that `cancellation` is never read.
_ = cancellation
}
/// Specifies the queue on which to receive elements from the publisher.
/// Received values are propagated synchronously if publishing occurs on the specified queue.
///
/// - Parameters:
/// - queue: The queue the publisher uses for element delivery.
/// - Returns: A publisher that delivers elements using the specified scheduler.
public func receiveSynchronouslyIfPossible(on queue: DispatchQueue) -> some Publisher<Output, Failure> {
PossiblySynchronousPublisher(upstreamPublisher: self, queue: queue)
}
}
// MARK: - PossiblySynchronousPublisher
/// A Publisher that propagates values synchronously if the upstream publisher publishes on the desired queue.
private struct PossiblySynchronousPublisher<Upstream: Publisher>: Publisher {
// MARK: Initialization
public init(upstreamPublisher: Upstream, queue: DispatchQueue) {
self.upstreamPublisher = upstreamPublisher
context = DispatchContext(queue: queue)
}
// MARK: Publisher
typealias Output = Upstream.Output
typealias Failure = Upstream.Failure
func receive<S>(subscriber: S) where S : Subscriber, Upstream.Failure == S.Failure, Upstream.Output == S.Input {
var cancellation: AnyCancellable?
cancellation = upstreamPublisher.sink(
receiveCompletion: { completion in
subscriber.receive(completion: completion)
cancellation = nil
}, receiveValue: { nextValue in
context.execute {
_ = subscriber.receive(nextValue)
}
})
// Silence the warning that `cancellation` is never read.
_ = cancellation
}
// MARK: Private
private let upstreamPublisher: Upstream
private let context: DispatchContext
}
/// An executor of closures.
/// - Note: This type is `Sendable` because the Dispatch properties are never mutated.
private final class DispatchContext: @unchecked Sendable {
init(queue: DispatchQueue) {
self.queue = queue
if queue == .main {
// Use a shared key for `main` to avoid repeatedly calling `setSpecific(key:value:)`
key = Self.mainKey
} else {
key = DispatchSpecificKey<Void>()
queue.setSpecific(key: key, value: ())
}
}
/// Executes the provided closure on the receiver's `queue`.
/// If the current queue is the receiver's `queue` the closure will be executed immediately.
/// - Parameter closure: The closure to execute.
func execute(closure: @escaping () -> Void) {
let closureHolder = UncheckedClosureHolder(closure: closure)
if DispatchQueue.getSpecific(key: key) == nil {
queue.async {
closureHolder.closure()
}
} else {
closure()
}
}
private let queue: DispatchQueue
private let key: DispatchSpecificKey<Void>
/// A shared key for the main queue.
private static let mainKey: DispatchSpecificKey<Void> = {
let mainKey = DispatchSpecificKey<Void>()
DispatchQueue.main.setSpecific(key: mainKey, value: ())
return mainKey
}()
/// A type that wraps a non-Sendable closure to avoid a compiler warning.
///
/// We need this type because `DispatchQueue.async` requires a `Sendable` closure as a parameter,
/// and we have no way of making our closures `Sendable`. For example, the `Observable` type utilizes
/// a function reference within a closure, and function references are not `Sendable` since they have no control
/// over when they are executed. We also modify mutable state in our closures – this is not a `Sendable` act!
/// However, we always modify or inspect mutable state on a single queue, meaning that our work is effectively
/// `Sendable`, even if we can't convince the compiler of that.
private struct UncheckedClosureHolder: @unchecked Sendable {
let closure: () -> Void
}
}
// MARK: - LazyPublished
/// A type that publishes a property whose initial value is loaded just-in-time.
///
/// When the property changes, publishing occurs in the property's `willSet` block, meaning subscribers receive the new value before it's actually set on the property.
///
/// - Note: This property wrapper can only be used on types that conform to `ObservableObject`.
/// - SeeAlso: `@Published`
@propertyWrapper
public struct LazyPublished<Value: Sendable> {
// MARK: Initialization
/// Initializes an `Observable` with a closure that constructs its first value.
/// - Parameter wrappedValue: A closure that creates the initial value of the `Observable`. Will only be called once.
public init(lazyWrappedValue: @Sendable @escaping () -> Value) {
subject = LazyCurrentValueSubject(lazyWrappedValue)
}
// MARK: Public
public static subscript<EnclosingType: ObservableObject>(
_enclosingInstance instance: EnclosingType,
wrapped wrappedKeyPath: ReferenceWritableKeyPath<EnclosingType, Value>,
storage storageKeyPath: ReferenceWritableKeyPath<EnclosingType, Self>
) -> Value where EnclosingType.ObjectWillChangePublisher == ObservableObjectPublisher
{
get {
instance[keyPath: storageKeyPath].subject.value
}
set {
instance[keyPath: \.objectWillChange].send()
instance[keyPath: storageKeyPath].subject.send(newValue)
}
}
@available(*, unavailable,
message: "@LazyPublished can only be applied to `ObservableObject`-conforming types"
)
public var wrappedValue: Value {
get { fatalError() }
set { fatalError() }
}
/// The stream represented as a Publisher.
public var projectedValue: Publisher {
Publisher(subject: subject)
}
// MARK: Publisher
public struct Publisher: Combine.Publisher {
init(subject: LazyCurrentValueSubject<Value, Never>) {
self.subject = subject
}
public func receive<S>(subscriber: S) where S : Subscriber, Never == S.Failure, Value == S.Input {
subject.receive(subscriber: subscriber)
}
public typealias Output = Value
public typealias Failure = Never
private let subject: LazyCurrentValueSubject<Value, Never>
}
// MARK: Private
private let subject: LazyCurrentValueSubject<Value, Never>
}
// MARK: - LazyCurrentValueSubject
/// A subject that wraps a single value and publishes a new element whenever the value changes.
///
/// Unlike `CurrentValueSubject`, `LazyCurrentValueSubject` is initialized with a lazily initialized value.
///
/// Calling `LazyCurrentValueSubject.send(_:)` on a `LazyCurrentValueSubject` also updates the current value, making it equivalent to updating the `LazyCurrentValueSubject/value` directly.
public final class LazyCurrentValueSubject<Output: Sendable, Failure: Error>: Subject {
// MARK: Initialization
/// Initializes an `Observable` with a closure that constructs its first value.
/// - Parameter lazyValue: A closure that creates the initial value of the `Observable`. Will only be called once.
public init(_ lazyValue: @Sendable @escaping () -> Output) {
original = CurrentValueSubject<@Sendable () -> Output, Failure>(lazyValue)
projected = CurrentValueSubject<Output?, Failure>(nil)
}
deinit {
// Clean up the internal Combine subscriptions.
// This call has no effect on consumers, but it does lower system memory usage.
send(completion: .finished)
}
// MARK: Subject
public func send(subscription: Subscription) {
// Ensure the lazy value is initialized.
_ = value
projected.send(subscription: subscription)
}
// MARK: Publisher
public func send(_ value: Output) {
self.value = value
}
public func send(completion: Subscribers.Completion<Failure>) {
original.send(completion: completion)
projected.send(completion: completion)
}
public func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
// Ensure the lazy value is initialized.
_ = value
projected.compactMap { $0 }.receive(subscriber: subscriber)
}
// MARK: Internal
/// The current value of the receiver.
var value: Output {
get {
initialValueLock.lock()
if projected.value == nil {
var cancellation: AnyCancellable?
cancellation = original.sink(
receiveCompletion: { _ in
cancellation = nil
}, receiveValue: { [projected] nextValue in
projected.value = nextValue()
})
// Silence the warning that `cancellation` is never read.
_ = cancellation
}
initialValueLock.unlock()
// Since we set a non-nil value above, we will always have a nextValue.
return projected.value!
}
set {
original.send( { newValue } )
}
}
// MARK: Private
private let original: CurrentValueSubject<@Sendable () -> Output, Failure>
private let projected: CurrentValueSubject<Output?, Failure>
private let initialValueLock: NSLock = NSLock()
}
// MARK: - Tests
import XCTest
@MainActor
final class ObservableTests: XCTestCase {
// MARK: Behavior Tests
func test_streamValues_streamsLastSetValue() {
let counter = Counter()
let receiver = ValueReceiver() { value in
XCTAssertEqual(value, 1_000)
counter.incrementAndExpectCount(equals: 1)
}
let systemUnderTest = CurrentValueSubject<Int, Never>(1)
for iteration in 2...1_000 {
systemUnderTest.send(iteration)
}
systemUnderTest.sink(to: ValueReceiver.receiveValue, whileExists: receiver)
XCTAssertEqual(counter.count, 1)
}
func test_streamValues_streamsValuesInOrder() {
let counter = Counter()
let receiver = ValueReceiver() { value in
counter.incrementAndExpectCount(equals: value)
}
let systemUnderTest = CurrentValueSubject<Int, Never>(1)
systemUnderTest.sink(to: ValueReceiver.receiveValue, whileExists: receiver)
for iteration in 2...1_000 {
systemUnderTest.send(iteration)
}
XCTAssertEqual(counter.count, 1_000)
}
func test_streamValues_streamsValuesAfterInitializingLazyValue() {
let counter = Counter()
let receiver = ValueReceiver() { value in
counter.incrementAndExpectCount(equals: value)
}
final class SUTWrapper: ObservableObject {
@LazyPublished(lazyWrappedValue: { 1 })
var systemUnderTest: Int
}
let sutWrapper = SUTWrapper()
sutWrapper.$systemUnderTest.sink(to: ValueReceiver.receiveValue, whileExists: receiver)
for iteration in 2...1_000 {
sutWrapper.systemUnderTest = iteration
}
XCTAssertEqual(counter.count, 1_000)
}
func test_streamValues_streamsLazyValueFirst() {
let counter = Counter()
let receiver = ValueReceiver() { value in
XCTAssertEqual(value, 1_000)
counter.incrementAndExpectCount(equals: 1)
}
final class SUTWrapper: ObservableObject {
@LazyPublished(lazyWrappedValue: { 1 })
var systemUnderTest: Int
}
let sutWrapper = SUTWrapper()
for iteration in 2...1_000 {
sutWrapper.systemUnderTest = iteration
}
sutWrapper.$systemUnderTest.sink(to: ValueReceiver.receiveValue, whileExists: receiver)
XCTAssertEqual(counter.count, 1)
}
func test_streamValues_derivesLazyValueOnlyOnce() {
final class SUTWrapper: ObservableObject {
@LazyPublished
var systemUnderTest: Int
/// - Note: Sendability is faked on this type since we know we'll only call into this type on a single queue.
final class CallCounter: @unchecked Sendable {
func increment() {
count += 1
}
var count = 0
}
init() {
_systemUnderTest = LazyPublished(lazyWrappedValue: { [callCounter] in
callCounter.increment()
return 1
})
}
let callCounter = CallCounter()
}
let sutWrapper = SUTWrapper()
let receiver = ValueReceiver<Int>() { _ in }
let publisher = sutWrapper.$systemUnderTest.eraseToAnyPublisher()
DispatchQueue.concurrentPerform(iterations: 1_000) { _ in
publisher.sink(to: ValueReceiver.receiveValue, whileExists: receiver)
}
XCTAssertEqual(sutWrapper.callCounter.count, 1)
}
func test_streamValues_doesNotLoadLazyValueUntilAccessed() {
final class SUTWrapper: ObservableObject {
@LazyPublished
var systemUnderTest: Int
/// - Note: Sendability is faked on this type since we know we'll only call into this type on a single queue.
final class CallCounter: @unchecked Sendable {
func increment() {
count += 1
}
var count = 0
}
init() {
_systemUnderTest = LazyPublished(lazyWrappedValue: { [callCounter] in
callCounter.increment()
return 0
})
}
let callCounter = CallCounter()
}
let sutWrapper = SUTWrapper()
XCTAssertEqual(sutWrapper.callCounter.count, 0)
_ = sutWrapper.systemUnderTest
XCTAssertEqual(sutWrapper.callCounter.count, 1)
}
func test_streamValues_doesNotLoadLazyValueUntilSubscribed() {
final class SUTWrapper: ObservableObject {
@LazyPublished
var systemUnderTest: Int
/// - Note: Sendability is faked on this type since we know we'll only call into this type on a single queue.
final class CallCounter: @unchecked Sendable {
func increment() {
count += 1
}
var count = 0
}
init() {
_systemUnderTest = LazyPublished(lazyWrappedValue: { [callCounter] in
callCounter.increment()
return 0
})
}
let callCounter = CallCounter()
}
let receiver = ValueReceiver<Int>() { _ in }
let sutWrapper = SUTWrapper()
XCTAssertEqual(sutWrapper.callCounter.count, 0)
sutWrapper.$systemUnderTest.sink(to: ValueReceiver.receiveValue, whileExists: receiver)
XCTAssertEqual(sutWrapper.callCounter.count, 1)
}
func test_streamValues_doesNotStreamLazyValueIfSetFirst() {
let counter = Counter()
let receiver = ValueReceiver() { value in
XCTAssertEqual(value, 1_000)
counter.incrementAndExpectCount(equals: 1)
}
final class SUTWrapper: ObservableObject {
@LazyPublished(lazyWrappedValue: { 1 })
var systemUnderTest: Int
}
let sutWrapper = SUTWrapper()
sutWrapper.systemUnderTest = 1
for iteration in 2...1_000 {
sutWrapper.systemUnderTest = iteration
}
sutWrapper.$systemUnderTest.sink(to: ValueReceiver.receiveValue, whileExists: receiver)
XCTAssertEqual(counter.count, 1)
}
func test_streamValues_doesNotLoadLazyValueIfSetFirst() {
let receiver = ValueReceiver() { value in
XCTAssertEqual(value, 1)
}
final class SUTWrapper: ObservableObject {
@LazyPublished(lazyWrappedValue: {
XCTFail("Unexpectedly loaded lazy value")
return 0
})
var systemUnderTest: Int
}
let sutWrapper = SUTWrapper()
sutWrapper.systemUnderTest = 1
sutWrapper.$systemUnderTest.sink(to: ValueReceiver.receiveValue, whileExists: receiver)
}
func test_streamValues_streamsLastSetValueThenAllNewValuesInOrder() {
let counter = Counter()
let receiver = ValueReceiver() { value in
counter.incrementAndExpectCount(equals: value + 1)
}
let systemUnderTest = CurrentValueSubject<Int, Never>(0)
systemUnderTest.sink(to: ValueReceiver.receiveValue, whileExists: receiver)
for iteration in 1...1_000 {
systemUnderTest.send(iteration)
}
XCTAssertEqual(counter.count, 1_001)
}
func test_streamValues_doesNotStreamAllPastValues() {
let counter = Counter()
let receiver = ValueReceiver() { value in
XCTAssertEqual(value, 1)
counter.incrementAndExpectCount(equals: value)
}
let systemUnderTest = CurrentValueSubject<Int, Never>(0)
systemUnderTest.send(1)
systemUnderTest.sink(to: ValueReceiver.receiveValue, whileExists: receiver)
XCTAssertEqual(counter.count, 1)
}
func test_streamValues_doesNotRetainObserver() {
let counter = Counter()
var receiver: ValueReceiver? = ValueReceiver() { value in
counter.incrementAndExpectCount(equals: value)
}
let systemUnderTest = CurrentValueSubject<Int, Never>(1)
if let receiver {
systemUnderTest.sink(to: ValueReceiver.receiveValue, whileExists: receiver)
}
for iteration in 2...1_000 {
systemUnderTest.send(iteration)
if iteration == 500 {
receiver = nil
}
}
XCTAssertEqual(counter.count, 500)
}
func test_receiveSynchronouslyIfPossible_streamsValuesOnGivenQueue() {
let key = DispatchSpecificKey<Void>()
let queue = DispatchQueue(label: #function)
queue.setSpecific(key: key, value: ())
let expectation = self.expectation(description: #function)
let systemUnderTest = CurrentValueSubject<Int, Never>(0)
let receiver = ValueReceiver<Int>() { _ in
XCTAssertNotNil(DispatchQueue.getSpecific(key: key))
expectation.fulfill()
}
systemUnderTest
.receiveSynchronouslyIfPossible(on: queue)
.sink(to: ValueReceiver.receiveValue, whileExists: receiver)
waitForExpectations(timeout: 1.0)
}
func test_receiveSynchronouslyIfPossible_streamsValuesSynchronouslyIfPublishedOnReceivingQueue() {
let counter = Counter()
let receiver = ValueReceiver() { value in
XCTAssertEqual(value, 0)
counter.incrementAndExpectCount(equals: 1)
}
let systemUnderTest = CurrentValueSubject<Int, Never>(0)
systemUnderTest
.receiveSynchronouslyIfPossible(on: .main)
.sink(to: ValueReceiver.receiveValue, whileExists: receiver)
XCTAssertEqual(counter.count, 1)
}
func test_streamValues_streamsValuesOnSenderQueue() {
let key = DispatchSpecificKey<Void>()
let queue = DispatchQueue(label: #function)
queue.setSpecific(key: key, value: ())
let expectation = self.expectation(description: #function)
let receiver = ValueReceiver<Int>() { _ in
XCTAssertNotNil(DispatchQueue.getSpecific(key: key))
expectation.fulfill()
}
queue.async {
let systemUnderTest = CurrentValueSubject<Int, Never>(0)
systemUnderTest.sink(to: ValueReceiver.receiveValue, whileExists: receiver)
}
waitForExpectations(timeout: 1.0)
}
func test_wrappedValue_returnsSetValue() {
final class SUTWrapper: ObservableObject {
@LazyPublished(lazyWrappedValue: { 1 })
var systemUnderTest: Int
}
let sutWrapper = SUTWrapper()
sutWrapper.systemUnderTest = 2
XCTAssertEqual(sutWrapper.systemUnderTest, 2)
}
func test_wrappedValue_returnsLazyValue() {
final class SUTWrapper: ObservableObject {
@LazyPublished(lazyWrappedValue: { 1 })
var systemUnderTest: Int
}
let sutWrapper = SUTWrapper()
XCTAssertEqual(sutWrapper.systemUnderTest, 1)
}
func test_wrappedValue_derivesLazyValueOnlyOnceIfCalledBeforeValueStreamStarted() {
final class SUTWrapper: ObservableObject {
@LazyPublished
var systemUnderTest: Int
/// - Note: Sendability is faked on this type since we know we'll only call into this type on a single queue.
final class CallCounter: @unchecked Sendable {
func increment() {
count += 1
}
var count = 0
}
init() {
_systemUnderTest = LazyPublished(lazyWrappedValue: { [callCounter] in
callCounter.increment()
return 1
})
}
let callCounter = CallCounter()
}
let sutWrapper = SUTWrapper()
_ = sutWrapper.systemUnderTest
let receiver = ValueReceiver<Int>() { _ in }
let publisher = sutWrapper.$systemUnderTest.eraseToAnyPublisher()
DispatchQueue.concurrentPerform(iterations: 1_000) { _ in
publisher.sink(to: ValueReceiver.receiveValue, whileExists: receiver)
}
XCTAssertEqual(sutWrapper.callCounter.count, 1)
}
func test_wrappedValue_derivesLazyValueOnlyOnceIfCalledAfterValueStreamStarted() {
final class SUTWrapper: ObservableObject {
@LazyPublished
var systemUnderTest: Int
/// - Note: Sendability is faked on this type since we know we'll only call into this type on a single queue.
final class CallCounter: @unchecked Sendable {
func increment() {
count += 1
}
var count = 0
}
init() {
_systemUnderTest = LazyPublished(lazyWrappedValue: { [callCounter] in
callCounter.increment()
return 1
})
}
let callCounter = CallCounter()
}
let sutWrapper = SUTWrapper()
let receiver = ValueReceiver<Int>() { _ in }
let publisher = sutWrapper.$systemUnderTest.eraseToAnyPublisher()
DispatchQueue.concurrentPerform(iterations: 1_000) { _ in
publisher.sink(to: ValueReceiver.receiveValue, whileExists: receiver)
}
_ = sutWrapper.systemUnderTest
XCTAssertEqual(sutWrapper.callCounter.count, 1)
}
// MARK: Private
/// - Note: This type fakes being `Sendable`. It is up to the caller to ensure the receiving block is called in a non-dangerous way.
private final class ValueReceiver<Value>: @unchecked Sendable {
init(receiving: @escaping (Value) -> Void) {
self.receiving = receiving
}
func receiveValue(_ value: Value) {
receiving(value)
}
private let receiving: (Value) -> Void
}
private final class Counter {
func incrementAndExpectCount(equals expectedCount: Int, file: StaticString = #filePath, line: UInt = #line) {
increment()
XCTAssertEqual(expectedCount, count, file: file, line: line)
}
func increment() {
count += 1
}
var count = 0
}
}
extension AnyPublisher: @unchecked Sendable {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment