Skip to content

Instantly share code, notes, and snippets.

@pexavc
Last active September 6, 2023 20:54
Show Gist options
  • Save pexavc/5cbd40571f2edb80968331e4002d1b95 to your computer and use it in GitHub Desktop.
Save pexavc/5cbd40571f2edb80968331e4002d1b95 to your computer and use it in GitHub Desktop.
import Foundation
import Combine
extension Publisher {
func pausableSink(
receiveValue: @escaping (Output) -> Void,
receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void
) -> PausableSinkSubscriber<Output, Failure> {
let subscriber = PausableSinkSubscriber<Output, Failure>(
receiveValue: receiveValue,
receiveCompletion: receiveCompletion
)
self.subscribe(subscriber)
return subscriber
}
func pausableSink(
_ receiveValue: @escaping (Output) -> Void
) -> PausableSinkSubscriber<Output, Failure> where Failure == Never {
let subscriber = PausableSinkSubscriber<Output, Failure>(
receiveValue: receiveValue
)
self.subscribe(subscriber)
return subscriber
}
}
class PausableSinkSubscriber<Input, Failure: Error>: Subscriber, Cancellable {
var subscription: Subscription?
enum State {
case idle
case stopped
case paused
case normal
}
//TODO: atomic
var state: State = .idle {
didSet {
if oldValue == .paused && state == .normal {
sendBuffer()
}
}
}
var isStopped = false
var buffer = [Input]()
var receiveValue: (Input) -> Void
var receiveCompletion: ((Subscribers.Completion<Failure>) -> Void)?
init(
receiveValue: @escaping (Input) -> Void,
receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void
) {
self.receiveValue = receiveValue
self.receiveCompletion = receiveCompletion
}
init(
receiveValue: @escaping (Input) -> Void
) {
self.receiveValue = receiveValue
self.receiveCompletion = nil
}
deinit {
cancel()
}
func receive(subscription: Subscription) {
self.subscription = subscription
subscription.request(.unlimited)
}
func receive(_ input: Input) -> Subscribers.Demand {
switch state {
case .normal:
receiveValue(input)
case .paused:
buffer.append(input)
default:
break
}
return self.demand
}
func sendBuffer() {
while !buffer.isEmpty {
let value = buffer.remove(at: 0)
receiveValue(value)
}
}
func receive(completion: Subscribers.Completion<Failure>) {
receiveCompletion?(completion)
}
var demand: Subscribers.Demand {
switch state {
case .paused, .stopped:
return .none
default:
return .none
}
}
func cancel() {
subscription?.cancel()
subscription = nil
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment