Created
February 13, 2020 00:58
-
-
Save stephanecopin/bc5d1b7db86f46f7f0e10d435dde6534 to your computer and use it in GitHub Desktop.
An implemention of `CombineLatestMany` for Combine (Swift), which takes takes an array of `Publisher` and return a collection of values, based on their latest value.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
private protocol LockImplementation { | |
mutating func lock() | |
mutating func `try`() -> Bool | |
mutating func unlock() | |
} | |
private struct UnfairLock: LockImplementation { | |
private var unfairLock = os_unfair_lock_s() | |
mutating func lock() { | |
os_unfair_lock_lock(&self.unfairLock) | |
} | |
mutating func `try`() -> Bool { | |
os_unfair_lock_trylock(&self.unfairLock) | |
} | |
mutating func unlock() { | |
os_unfair_lock_unlock(&self.unfairLock) | |
} | |
} | |
public final class Lock { | |
private var lockImplementation: LockImplementation | |
public init() { | |
self.lockImplementation = UnfairLock() | |
} | |
public func lock() { | |
self.lockImplementation.lock() | |
} | |
public func `try`() -> Bool { | |
self.lockImplementation.try() | |
} | |
public func unlock() { | |
self.lockImplementation.unlock() | |
} | |
} | |
public final class AtomicValue<Value> { | |
private(set) var value: Value | |
private let lock = Lock() | |
public init(_ value: Value) { | |
self.value = value | |
} | |
public static func .= (atomicValue: AtomicValue, value: Value) { | |
atomicValue.modify { $0 = value } | |
} | |
public func modify(_ modify: (inout Value) -> Void) { | |
self.lock.lock() | |
defer { | |
self.lock.unlock() | |
} | |
modify(&self.value) | |
} | |
public func withValue(_ getter: (Value) -> Void) { | |
self.lock.lock() | |
defer { | |
self.lock.unlock() | |
} | |
getter(self.value) | |
} | |
} | |
extension Publishers { | |
// This class is mirrored from the behavior of the native `CombineLatest`, but allows for infinite (or close enough) amount of values through using `Collection`. | |
// The code above (`AtomicValue`, `Lock`) may be placed in separate files if you wish to. | |
// This implementation is not optimized, but I haven't ran into any performance while using it so far so I refrained from doing any further optimizations. | |
public struct CombineLatestMany<PublisherCollection: Swift.Collection>: Publisher where PublisherCollection.Element: Combine.Publisher { | |
public typealias Output = [PublisherCollection.Element.Output] | |
public typealias Failure = PublisherCollection.Element.Failure | |
public let publishers: PublisherCollection | |
public init(_ publishers: PublisherCollection) { | |
self.publishers = publishers | |
} | |
public func receive<Subscriber: Combine.Subscriber>(subscriber: Subscriber) where PublisherCollection.Element.Failure == Subscriber.Failure, Subscriber.Input == Output { | |
if self.publishers.isEmpty { | |
_ = subscriber.receive([]) | |
subscriber.receive(completion: .finished) | |
return | |
} | |
let publishers = Array(self.publishers) | |
let cancellables = AtomicValue( | |
[ | |
( | |
cancellable: AnyCancellable?, | |
latestValue: PublisherCollection.Element.Output?, | |
hasCompleted: Bool | |
), | |
](repeating: (nil, nil, false), count: self.publishers.count) | |
) | |
publishers.enumerated().forEach { index, publisher in | |
let cancellable = publisher.sink( | |
receiveCompletion: { completion in | |
cancellables.modify { | |
switch completion { | |
case .failure(let error): | |
subscriber.receive(completion: .failure(error)) | |
for i in $0.indices { | |
$0[i].cancellable = nil | |
} | |
case .finished: | |
$0[index].cancellable = nil | |
$0[index].hasCompleted = true | |
if $0.allSatisfy({ $0.hasCompleted }) { | |
subscriber.receive(completion: .finished) | |
} | |
} | |
} | |
}, | |
receiveValue: { value in | |
cancellables.modify { | |
$0[index].latestValue = value | |
let allLatestValues = $0.compactMap { $0.latestValue } | |
if allLatestValues.count == publishers.count { | |
_ = subscriber.receive(allLatestValues) | |
} | |
} | |
} | |
) | |
cancellables.modify { | |
if !$0[index].hasCompleted { | |
$0[index].cancellable = cancellable | |
} | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment