Created
December 6, 2022 08:33
-
-
Save simme/3d790dbb89e5bbcf74d7603f509ecda5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import CloudKit | |
import Combine | |
/// A publisher that wraps a `CKFetchRecordZoneChangesOperation` and emits events as the operation completes. | |
/// | |
/// The `FetchRecordZoneChangesPublisher` fetches changes from the given record zones. New and deleted records are | |
/// posted individually via the `.recordChanged` and `.recordDeleted` actions. | |
/// | |
/// Errors are automatically retried if possible. Resetting the change token in case it expired is also automatically | |
/// handled. Because emitting errors fails a publisher all errors are posted as actions. There may still be running | |
/// operations, even if an error occurs. | |
/// | |
/// The publisher emits a completion event once all running operations have finished. | |
struct FetchRecordZoneChangesPublisher: Publisher { | |
// MARK: Types | |
/// The events emitted by the publisher. | |
enum Action { | |
/// Posted when the record zone change token is updated. | |
case recordZoneChangeTokenUpdated(zoneID: CKRecordZone.ID, token: CKServerChangeToken?, clientToken: Data?) | |
/// Posted when an updated record is received from the server. | |
case recordChanged(CKRecord) | |
/// Posted when a record has been deleted on the server. | |
case recordDeleted(CKRecord.ID, CKRecord.RecordType) | |
/// Posted when the fetch is complete for a specific zone. | |
case zoneFetchComplete(CKRecordZone.ID, CKServerChangeToken?, Data?, Bool) | |
/// An error occured and changes for the associated zone will be fetched again. | |
case retryingZone(CKRecordZone.ID) | |
/// An unrecoverable error occured. | |
case unrecoverableError(CKRecordZone.ID?, Error) | |
} | |
// MARK: Properties | |
/// The CloudKit database to perform the operatoin on. | |
private let database: CKDatabase | |
/// A queue for the operation. | |
private let queue: OperationQueue | |
/// A dictionary of zones and their respective change tokens. | |
private let zoneTokens: [CKRecordZone.ID: CKServerChangeToken?] | |
// MARK: Initialization | |
/// Creates a new `FetchRecordZoneChangesPublisher`. | |
/// | |
/// - Parameter database: The cloud kit database to run the operation on. | |
/// - Parameter queue: The operation queue responsible for executing the operation. | |
/// - Parameter zoneTokens: A dictionary of zones and their respective change tokens. | |
/// | |
/// - Returns: A new `FetchRecordZoneChangesPublisher`. | |
init(in database: CKDatabase, on queue: OperationQueue, zoneTokens: [CKRecordZone.ID: CKServerChangeToken?]) { | |
self.database = database | |
self.queue = queue | |
self.zoneTokens = zoneTokens | |
} | |
// MARK: Publisher Implementation | |
func receive<S>(subscriber: S) where S: Subscriber, Never == S.Failure, Action == S.Input { | |
let subscription = Subscription( | |
subscriber: subscriber, | |
zoneTokens: zoneTokens, | |
in: database, | |
on: queue | |
) | |
subscriber.receive(subscription: subscription) | |
} | |
typealias Output = Action | |
typealias Failure = Never | |
} | |
// MARK: - Subscription | |
private extension FetchRecordZoneChangesPublisher { | |
/// The subscription wraps the actual operation execution and emits actions to its subscriber. | |
final class Subscription<S: Subscriber> where S.Input == Output, S.Failure == Failure { | |
// MARK: Properties | |
/// The active subscriber receiving input, if any. | |
private var subscriber: S? | |
/// The operation queue to execute the operations on. | |
private let queue: OperationQueue | |
/// The cloud kit database to run the operations against. | |
private let database: CKDatabase | |
/// All in-flight operations. | |
private var operations: [CKFetchRecordZoneChangesOperation] = [] | |
/// A list of zones and their respective tokens. May change as zones are retried. | |
private var zoneTokens: [CKRecordZone.ID: CKServerChangeToken?] | |
/// `true` if all changes should be fetched. The consumer is responsible for kicking off more operations if | |
/// necessary to collect all data. | |
private let fetchAllChanges: Bool | |
/// Internal state, makes sure we only start one operation initially. | |
private var didStart: Bool = false | |
// MARK: Initialization | |
/// Creates a new `FetchRecordZoneChangesPublisher.Subscription`. | |
/// | |
/// - Parameter subscriber: The subscriber to notify. | |
/// - Parameter zoneTkens: A list of zones and their respective tokens. May change as zones are retried. | |
/// - Parameter fetchAllChanges: `true` if all changes should be fetched. The consumer is responsible for kicking | |
/// off more operations if necessary to collect all data. | |
/// - Parameter database: The CloudKit database to perform the operation on. | |
/// - Parameter queue: An operation queue to run the operation. | |
/// | |
/// - Returns: A new `FetchRecordZoneChangesPublisher.Subscriber`. | |
init( | |
subscriber: S, | |
zoneTokens: [CKRecordZone.ID: CKServerChangeToken?], | |
fetchAllChanges: Bool = true, | |
in database: CKDatabase, | |
on queue: OperationQueue | |
) { | |
self.subscriber = subscriber | |
self.database = database | |
self.zoneTokens = zoneTokens | |
self.fetchAllChanges = fetchAllChanges | |
self.queue = queue | |
} | |
/// Configures an operation and sets up the callbacks to send events. | |
/// | |
/// - Parameter zoneTokens: A list of zones and their respective tokens. | |
/// | |
/// - Returns: A new `CKFetchRecordZoneChangesOperation`. | |
private func configureOperation( | |
zoneTokens: [CKRecordZone.ID: CKServerChangeToken?] | |
) -> CKFetchRecordZoneChangesOperation { | |
let configurations = Dictionary(uniqueKeysWithValues: zoneTokens.map { id, token in | |
(id, CKFetchRecordZoneChangesOperation.ZoneConfiguration( | |
previousServerChangeToken: token, | |
resultsLimit: nil, | |
desiredKeys: nil | |
)) | |
}) | |
let operation = CKFetchRecordZoneChangesOperation( | |
recordZoneIDs: Array(zoneTokens.keys), | |
configurationsByRecordZoneID: configurations | |
) | |
operation.fetchAllChanges = fetchAllChanges | |
operation.database = database | |
operation.qualityOfService = .userInitiated | |
operation.recordZoneChangeTokensUpdatedBlock = { [weak self] zoneID, token, clientToken in | |
self?.zoneTokens[zoneID] = token | |
_ = self?.subscriber?.receive(.recordZoneChangeTokenUpdated( | |
zoneID: zoneID, | |
token: token, | |
clientToken: clientToken | |
)) | |
} | |
operation.recordChangedBlock = { [weak self] record in | |
_ = self?.subscriber?.receive(.recordChanged(record)) | |
} | |
operation.recordWithIDWasDeletedBlock = { [weak self] id, type in | |
_ = self?.subscriber?.receive(.recordDeleted(id, type)) | |
} | |
operation.recordZoneFetchCompletionBlock = { [weak self] zoneID, changeToken, clientToken, moreComing, error in | |
if let error = error { | |
self?.handleError(error, for: zoneID) | |
} else { | |
_ = self?.subscriber?.receive(.zoneFetchComplete(zoneID, changeToken, clientToken, moreComing)) | |
} | |
} | |
operation.fetchRecordZoneChangesCompletionBlock = { [weak self] error in | |
guard let strongSelf = self else { return } | |
_ = strongSelf.operations.firstIndex(of: operation).map { strongSelf.operations.remove(at: $0) } | |
if let error = error { | |
strongSelf.handleError(error, for: nil) | |
} else { | |
if strongSelf.operations.isEmpty { | |
self?.subscriber?.receive(completion: .finished) | |
} | |
} | |
} | |
self.operations.append(operation) | |
return operation | |
} | |
/// Handle CloudKit errors. | |
/// | |
/// - Parameter error: The error to handle. | |
/// - Parameter zoneID: The ID of the zone in which the error occured. | |
private func handleError(_ error: Error, for zoneID: CKRecordZone.ID?) { | |
if error.isCloudKitTokenExpiredError, let zoneID = zoneID { | |
_ = subscriber?.receive(.recordZoneChangeTokenUpdated(zoneID: zoneID, token: nil, clientToken: nil)) | |
_ = subscriber?.receive(.retryingZone(zoneID)) | |
let newOperation = configureOperation(zoneTokens: [zoneID: nil]) | |
queue.addOperation(newOperation) | |
} else if let retryDelay = error.delayIfRetryPossible(), let zoneID = zoneID { | |
queue.schedule(after: .init(Date() + retryDelay)) { [weak self] in | |
let token = self?.zoneTokens[zoneID] ?? nil | |
guard let newOperation = self?.configureOperation(zoneTokens: [zoneID: token]) else { return } | |
self?.queue.addOperation(newOperation) | |
} | |
} else { | |
_ = subscriber?.receive(.unrecoverableError(zoneID, error)) | |
} | |
} | |
} | |
} | |
// MARK: - | |
extension FetchRecordZoneChangesPublisher.Subscription: Cancellable { | |
func cancel() { | |
subscriber = nil | |
for operation in operations { | |
operation.cancel() | |
} | |
} | |
} | |
extension FetchRecordZoneChangesPublisher.Subscription: Subscription { | |
func request(_ demand: Subscribers.Demand) { | |
guard subscriber != nil else { return } | |
if demand > 0 && !didStart { | |
let operation = configureOperation(zoneTokens: zoneTokens) | |
queue.addOperation(operation) | |
didStart = true | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment