Last active
September 23, 2022 23:28
-
-
Save dfed/8d072340c38fb65f8a33c3883d41844e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
public final class CurrentValueStream<T: Sendable>: @unchecked Sendable { | |
public init() { | |
subject = CurrentValueSubject<T?, Never>(nil) | |
} | |
public init(initialValue: T) { | |
subject = CurrentValueSubject<T?, Never>(initialValue) | |
} | |
public func sendValue(_ value: T) { | |
subject.send(value) | |
} | |
public func terminate() { | |
subject.send(completion: .finished) | |
} | |
public var stream: AsyncStream<T> { | |
AsyncStream { continuation in | |
let subscription = subject.sink(receiveCompletion: { _ in | |
continuation.finish() | |
}, receiveValue: { value in | |
guard let value = value else { return } | |
continuation.yield(value) | |
}) | |
continuation.onTermination = { _ in | |
// Store the subscription until the stream is termianted. | |
_ = subscription | |
} | |
} | |
} | |
// CurrentValueSubject is not sendable, but since we never use the `value` accessor we can consider ourselves sendable. | |
private let subject: CurrentValueSubject<T?, Never> | |
} | |
public func +=<T: Sendable> (lhs: CurrentValueStream<T>, rhs: T) { | |
lhs.sendValue(rhs) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment