Last active
September 6, 2023 20:54
-
-
Save pexavc/5cbd40571f2edb80968331e4002d1b95 to your computer and use it in GitHub Desktop.
Slightly Modified version of: https://forums.swift.org/t/how-to-pause-and-resume-a-combine-publisher/57919
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
import Combine | |
extension Publisher { | |
func pausableSink( | |
receiveValue: @escaping (Output) -> Void, | |
receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void | |
) -> PausableSinkSubscriber<Output, Failure> { | |
let subscriber = PausableSinkSubscriber<Output, Failure>( | |
receiveValue: receiveValue, | |
receiveCompletion: receiveCompletion | |
) | |
self.subscribe(subscriber) | |
return subscriber | |
} | |
func pausableSink( | |
_ receiveValue: @escaping (Output) -> Void | |
) -> PausableSinkSubscriber<Output, Failure> where Failure == Never { | |
let subscriber = PausableSinkSubscriber<Output, Failure>( | |
receiveValue: receiveValue | |
) | |
self.subscribe(subscriber) | |
return subscriber | |
} | |
} | |
class PausableSinkSubscriber<Input, Failure: Error>: Subscriber, Cancellable { | |
var subscription: Subscription? | |
enum State { | |
case idle | |
case stopped | |
case paused | |
case normal | |
} | |
//TODO: atomic | |
var state: State = .idle { | |
didSet { | |
if oldValue == .paused && state == .normal { | |
sendBuffer() | |
} | |
} | |
} | |
var isStopped = false | |
var buffer = [Input]() | |
var receiveValue: (Input) -> Void | |
var receiveCompletion: ((Subscribers.Completion<Failure>) -> Void)? | |
init( | |
receiveValue: @escaping (Input) -> Void, | |
receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void | |
) { | |
self.receiveValue = receiveValue | |
self.receiveCompletion = receiveCompletion | |
} | |
init( | |
receiveValue: @escaping (Input) -> Void | |
) { | |
self.receiveValue = receiveValue | |
self.receiveCompletion = nil | |
} | |
deinit { | |
cancel() | |
} | |
func receive(subscription: Subscription) { | |
self.subscription = subscription | |
subscription.request(.unlimited) | |
} | |
func receive(_ input: Input) -> Subscribers.Demand { | |
switch state { | |
case .normal: | |
receiveValue(input) | |
case .paused: | |
buffer.append(input) | |
default: | |
break | |
} | |
return self.demand | |
} | |
func sendBuffer() { | |
while !buffer.isEmpty { | |
let value = buffer.remove(at: 0) | |
receiveValue(value) | |
} | |
} | |
func receive(completion: Subscribers.Completion<Failure>) { | |
receiveCompletion?(completion) | |
} | |
var demand: Subscribers.Demand { | |
switch state { | |
case .paused, .stopped: | |
return .none | |
default: | |
return .none | |
} | |
} | |
func cancel() { | |
subscription?.cancel() | |
subscription = nil | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment