Last active
September 7, 2021 20:04
-
-
Save dhoerl/57fe946f95b647184c38b5c3942fc32c to your computer and use it in GitHub Desktop.
Combine Sample Publisher for Medium Article
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
enum SPErrors: Error { | |
case inputStringWasEmpty | |
} | |
struct StringPublisher: Publisher { | |
typealias Output = [Character] | |
typealias Failure = Error | |
private let data: [Character] | |
init(string: String) { | |
self.data = string.map({$0}) | |
} | |
func receive<S>(subscriber: S) where | |
S: Subscriber, | |
S.Failure == Self.Failure, | |
S.Input == Self.Output | |
{ | |
let subscription = StringPublisherSubscription(subscriber: subscriber, data: data) | |
subscriber.receive(subscription: subscription) | |
} | |
final class StringPublisherSubscription<S>: Subscription where | |
S: Subscriber, | |
S.Input == [Character], | |
S.Failure == Error | |
{ | |
private var subscriber: S? | |
private var data: [Character] | |
private var runningDemand: Subscribers.Demand = .max(0) | |
private var isFinished = false | |
private var isProcessingRequest = false // make this Atomic to be threadsafe | |
init(subscriber: S, data: [Character]) { | |
self.subscriber = subscriber | |
self.data = data | |
} | |
func request(_ demand: Subscribers.Demand) { | |
guard !isFinished else { return } | |
guard let subscriber = subscriber else { return } | |
guard data.count > 0 else { return sendError(.inputStringWasEmpty) } | |
runningDemand += demand | |
if isProcessingRequest == true { | |
return | |
} else { | |
isProcessingRequest = true | |
} | |
while runningDemand > 0 && !data.isEmpty { | |
let count = computeSendCount() | |
let tempData: [Character] = Array( data.prefix(upTo: count) ) | |
let stillDesired = subscriber.receive(tempData) | |
// Only update counts and data AFTER sending receive | |
data.removeSubrange(0..<count) | |
runningDemand -= count | |
if let runningDesired = runningDemand.max, let stillDesired = stillDesired.max { | |
assert(runningDesired == stillDesired) | |
} | |
} | |
if data.isEmpty { | |
subscriber.receive(completion: .finished) | |
isFinished = true | |
} | |
isProcessingRequest = false | |
} | |
private func sendError(_ error: SPErrors) { | |
subscriber?.receive(completion: .failure(error)) | |
} | |
private func computeSendCount() -> Int { | |
let count: Int | |
if let demand = runningDemand.max { | |
count = Swift.min(data.count, demand) | |
} else { | |
count = data.count | |
} | |
return count | |
} | |
func cancel() { | |
isFinished = true | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment