Skip to content

Instantly share code, notes, and snippets.

@dfed
Last active September 23, 2022 23:28
Show Gist options
  • Save dfed/8d072340c38fb65f8a33c3883d41844e to your computer and use it in GitHub Desktop.
Save dfed/8d072340c38fb65f8a33c3883d41844e to your computer and use it in GitHub Desktop.
import Combine
public final class CurrentValueStream<T: Sendable>: @unchecked Sendable {
public init() {
subject = CurrentValueSubject<T?, Never>(nil)
}
public init(initialValue: T) {
subject = CurrentValueSubject<T?, Never>(initialValue)
}
public func sendValue(_ value: T) {
subject.send(value)
}
public func terminate() {
subject.send(completion: .finished)
}
public var stream: AsyncStream<T> {
AsyncStream { continuation in
let subscription = subject.sink(receiveCompletion: { _ in
continuation.finish()
}, receiveValue: { value in
guard let value = value else { return }
continuation.yield(value)
})
continuation.onTermination = { _ in
// Store the subscription until the stream is termianted.
_ = subscription
}
}
}
// CurrentValueSubject is not sendable, but since we never use the `value` accessor we can consider ourselves sendable.
private let subject: CurrentValueSubject<T?, Never>
}
public func +=<T: Sendable> (lhs: CurrentValueStream<T>, rhs: T) {
lhs.sendValue(rhs)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment