Last active
December 22, 2016 16:15
-
-
Save jspahrsummers/9756674 to your computer and use it in GitHub Desktop.
Non-blocking mutual exclusion with readers and writers, for asynchronous operations that may span more than one GCD block (making a single concurrent GCD queue unsuitable).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// GHReadWriteQueue.h | |
// GitHub | |
// | |
// Created by Justin Spahr-Summers on 2014-03-24. | |
// Copyright (c) 2014 GitHub. All rights reserved. | |
// | |
#import <Foundation/Foundation.h> | |
/// A read-write lock that synchronizes work at the level of signals instead of | |
/// blocks, and without blocking any queues. | |
@interface GHReadWriteQueue : NSObject | |
/// Initializes the queue with a debugging name. | |
/// | |
/// This is the designated initializer for this class. | |
/// | |
/// name - The debug name for this queue. Must not be nil. | |
- (instancetype)initWithName:(NSString *)name; | |
/// Enqueues an operation that can run concurrently with other operations. | |
/// | |
/// workSignal - A signal to subscribe to once a concurrent lock is obtained. This | |
/// signal should avoid performing synchronous work, as it will occupy | |
/// the queue until returning. Once the signal has sent `error` or | |
/// `completed`, the concurrent lock will be released. This argument | |
/// must not be nil. | |
/// | |
/// Returns a signal that will enqueue `workSignal` once for each new | |
/// subscription, subscribe to it when a concurrent lock is obtained, then | |
/// forward all events. | |
- (RACSignal *)addConcurrentSignal:(RACSignal *)workSignal; | |
/// Enqueues an operation that must run serially with respect to all other | |
/// operations. | |
/// | |
/// Exclusive operations have barrier semantics. Once an exclusive operation has | |
/// been enqueued, no operations enqueued thereafter will be allowed to begin | |
/// until it has finished executing. | |
/// | |
/// workSignal - A signal to subscribe to once the exclusive lock is obtained. This | |
/// signal should avoid performing synchronous work, as it will occupy | |
/// the queue until returning. Once the signal has sent `error` or | |
/// `completed`, the exclusive lock will be released. This argument | |
/// must not be nil. | |
/// | |
/// Returns a signal that will enqueue `workSignal` once for each new | |
/// subscription, subscribe to it when the exclusive lock is obtained, then | |
/// forward all events. | |
- (RACSignal *)addExclusiveSignal:(RACSignal *)workSignal; | |
@end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// GHReadWriteQueue.m | |
// GitHub | |
// | |
// Created by Justin Spahr-Summers on 2014-03-24. | |
// Copyright (c) 2014 GitHub. All rights reserved. | |
// | |
#import "GHReadWriteQueue.h" | |
@interface GHReadWriteQueue () | |
// The name for this queue. | |
@property (nonatomic, copy, readonly) NSString *name; | |
// A concurrent GCD queue upon which new operations are scheduled. | |
// | |
// In other words, a barrier block on this queue (while running) will prevent | |
// new operations from being started. | |
@property (nonatomic, readonly) dispatch_queue_t operationSchedulingQueue; | |
// A group entered whenever a concurrent operation has been started. | |
// | |
// When this group is empty, there are no concurrent operations in-flight. | |
// However, the `operationSchedulingQueue` must be suspended or blocked to | |
// guarantee that condition remains true. | |
@property (nonatomic, readonly) dispatch_group_t concurrentOperationsGroup; | |
@end | |
@implementation GHReadWriteQueue | |
#pragma mark Lifecycle | |
- (instancetype)initWithName:(NSString *)name { | |
NSParameterAssert(name != nil); | |
self = [super init]; | |
if (self == nil) return nil; | |
_name = [name copy]; | |
_concurrentOperationsGroup = dispatch_group_create(); | |
NSString *queueLabel = [self.name stringByAppendingString:@".operationSchedulingQueue"]; | |
_operationSchedulingQueue = dispatch_queue_create(queueLabel.UTF8String, DISPATCH_QUEUE_CONCURRENT); | |
// Starting new operations is low priority compared to finishing up existing | |
// work. | |
dispatch_set_target_queue(_operationSchedulingQueue, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0)); | |
return self; | |
} | |
- (void)dealloc { | |
if (_concurrentOperationsGroup != NULL) { | |
dispatch_release(_concurrentOperationsGroup); | |
_concurrentOperationsGroup = NULL; | |
} | |
if (_operationSchedulingQueue != NULL) { | |
dispatch_release(_operationSchedulingQueue); | |
_operationSchedulingQueue = NULL; | |
} | |
} | |
#pragma mark Queuing | |
- (RACSignal *)addConcurrentSignal:(RACSignal *)workSignal { | |
NSParameterAssert(workSignal != nil); | |
return [[RACSignal | |
createSignal:^(id<RACSubscriber> subscriber) { | |
RACDisposable *disposable = [[RACDisposable alloc] init]; | |
// Attempt to schedule a new concurrent operation. This queue | |
// will be suspended while an exclusive operation is running, so | |
// the block may not run immediately. | |
dispatch_async(self.operationSchedulingQueue, ^{ | |
if (disposable.disposed) return; | |
// Indicate that our operation is executing. | |
dispatch_group_enter(self.concurrentOperationsGroup); | |
// When the operation finishes, indicate that it's no longer | |
// executing. | |
// | |
// Order is important here! This should happen _after_ notifying | |
// the subscriber about termination events, in case it does any | |
// work that uses the repository. | |
void (^finished)(void) = ^{ | |
dispatch_group_leave(self.concurrentOperationsGroup); | |
}; | |
[workSignal subscribeNext:^(id x) { | |
[subscriber sendNext:x]; | |
} error:^(NSError *error) { | |
[subscriber sendError:error]; | |
finished(); | |
} completed:^{ | |
[subscriber sendCompleted]; | |
finished(); | |
}]; | |
}); | |
return disposable; | |
}] | |
setNameWithFormat:@"%@ %s", self, sel_getName(_cmd)]; | |
} | |
- (RACSignal *)addExclusiveSignal:(RACSignal *)workSignal { | |
NSParameterAssert(workSignal != nil); | |
return [[RACSignal | |
createSignal:^(id<RACSubscriber> subscriber) { | |
RACDisposable *disposable = [[RACDisposable alloc] init]; | |
// Wait for all outstanding operations to finish being scheduled. | |
dispatch_barrier_async(self.operationSchedulingQueue, ^{ | |
if (disposable.disposed) return; | |
// Then, disable any further scheduling. | |
dispatch_suspend(self.operationSchedulingQueue); | |
// When the operation finishes, resume scheduling. | |
// | |
// Order is important here! This should happen _after_ notifying | |
// the subscriber about termination events, in case it does any | |
// work that uses the repository. | |
void (^finished)(void) = ^{ | |
dispatch_resume(self.operationSchedulingQueue); | |
}; | |
// Once all concurrent operations finish, start a new exclusive | |
// operation. | |
// | |
// We know there are no exclusive operations in flight, because | |
// the `operationSchedulingQueue` would have been suspended (and | |
// this code therefore wouldn't be running) if that were the | |
// case. | |
dispatch_group_notify(self.concurrentOperationsGroup, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{ | |
if (disposable.disposed) { | |
finished(); | |
return; | |
} | |
[workSignal subscribeNext:^(id x) { | |
[subscriber sendNext:x]; | |
} error:^(NSError *error) { | |
[subscriber sendError:error]; | |
finished(); | |
} completed:^{ | |
[subscriber sendCompleted]; | |
finished(); | |
}]; | |
}); | |
}); | |
return disposable; | |
}] | |
setNameWithFormat:@"%@ %s", self, sel_getName(_cmd)]; | |
} | |
#pragma mark NSObject | |
- (NSString *)description { | |
return [NSString stringWithFormat:@"<%@: %p>{ name = %@ }", self.class, self, self.name]; | |
} | |
@end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment