Skip to content

Instantly share code, notes, and snippets.

@couchdeveloper
Last active December 29, 2015 12:09
Show Gist options
  • Save couchdeveloper/7669021 to your computer and use it in GitHub Desktop.
Save couchdeveloper/7669021 to your computer and use it in GitHub Desktop.
A RXStreamToStreamCopier asynchronously copies the content of a source stream into a destination stream.
//
// RXAsynchronousOperation.h
//
// Copyright 2013 Andreas Grosam
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#import <Foundation/Foundation.h>
#import <RXPromise/RXPromise.h>
/**
Defines the protocol RXAsynchronousOperation.
A concrete asynchronous operation encapsulates an asychronous task and its states
`isCancelled`, `isExecuting` and `isFinished`. An RXAsynchronousOperation class
can be implemented on top of a NSOperation - but it is strictly not required.
The eventual result will be signaled via a `RXPromise`.
An asynchronous operation SHOULD support cancellation, which aborts the asynchronous
task at the next cancellation point. The property `isCancelled` will return
YES when a cancel message has been sent to the reseiver, however the underlying
task may still executing.
The asynchronous task MUST be started through sending `start` to the asynchronous
operation. An asynchronous operation can be started only once.
An instance of RXAsynchronousOperation MUST implement a means to retain itself
as long as it is not finished in order to prevent premature deallocation when
the underlying task is still running but if there is no external variable
referencing the RXAsynchronousOperation object.
*/
@protocol RXAsynchronousOperation <NSObject>
@required
/**
Start the asynchronous operation.
If the operation is already started or has been cancelled, the method SHALL
have no effect.
*/
- (void) start;
/**
Cancels a running operation at the next cancelation point and returns
immediately. `cancel` may be send to the receiver from any thread.
If the receiver is _already_ cancelled or finished it SHALL ingore the
message.
*/
- (void) cancel;
@property (nonatomic, readonly) BOOL isCancelled;
@property (nonatomic, readonly) BOOL isExecuting;
@property (nonatomic, readonly) BOOL isFinished;
/**
Returns the result of the operation as a promise.
If the promise receives a `cancel` messages, it MUST send a corresponding cancel
message to the asynchronous operation.
*/
@property (nonatomic, readonly) RXPromise* promise;
@optional
/**
If the receiver implements this method, a cancellation from a sender MAY be
signaled through this method instead of the mandatory `cancel` method.
If the receiver is _already_ cancelled or in an error state it SHALL ingore the
message.
Signaling cancellation through this method along with a reason is the preferred
way since the receiver gets detailed information about the reason and can present
a more detailed error message to the user.
*/
- (void) cancelWithReason:(id)reason sender:(id)sender;
@end
/**
Example Implementation
Note: the following example does not employ synchronization. A real implementation
should utilize a sync queue for example to guard concurrent access to the ivars
and make them thread safe.
@interface MyAsyncOperation : NSObject <RXAsynchronousOperation>
@end
// implementation
#import "MyAsyncOperation.h"
#import <RXPromise/RXPromise.h>
@interface MyAsyncOperation ()
@property (nonatomic, readwrite) BOOL isCancelled;
@property (nonatomic, readwrite) BOOL isExecuting;
@property (nonatomic, readwrite) BOOL isFinished;
@property (nonatomic, readwrite) RXPromise* promise;
// Implementation specific properties:
@end
@implementation RXInputStreamSource
{
id _self; // used to create a circular reference to itself in order to prevent premature deallocation
RXPromise* _promise;
BOOL _isCancelled;
BOOL _isExecuting;
BOOL _isFinished;
// Implementation specific instance variables:
...
}
@synthesize promise = _promise;
@synthesize isCancelled = _isCancelled;
@synthesize isExecuting = _isExecuting;
@synthesize isFinished = _isFinished;
// Implementation specific @synthesize
...
- (id)init {
self = [super init];
if (self) {
// Implementation specific initialization
...
}
return self;
}
#pragma mark -
- (void) start {
if (_isCancelled || _isExecuting) {
return;
}
// Implementation specific: start the task
...
// Postconditions:
assert(_self == self);
assert(_isExecuting);
assert(self.promise != nil);
}
- (void) finish
{
// MUST be synchronized
if (!_isExecuting) {
return;
}
// Implementation specific: tear down
...
self.isExecuting = NO;
self.isFinished = YES;
_self = nil;
// Postconditions:
assert(_self == nil);
assert(_isExecuting == NO);
assert(_isFinished ==YES);
}
- (void) doTask
{
// do some work
if (taskIsFinished) {
[self.promise fulfillWithValue:result];
[self finish];
return;
}
else if (taskFailed) {
NSError* error = [NSError errorWithDomain:NSStringFromClass([self class])
code:-4
userInfo:@{NSLocalizedDescriptionKey: @"error occurred"}];
[self.promise rejectWithReason:error];
[self finish];
return;
}
}
// Lazy access - this is more elaborate
- (id) promise
{
// Access to _promise MUST be synchronized!
// (could be avoided if _promise will be initialized in the init method)
if (_promise == nil) {
_promise = [[RXPromise alloc] init];
_promise.then(nil, ^id(NSError* error){
if (_promise.isCancelled) {
[self cancelWithReason:error sender:nil];
}
return error;
});
}
return _promise;
}
- (void) cancel {
[self cancelWithReason:@"unknown" sender:nil];
}
- (void) cancelWithReason:(id)reason sender:(id)sender
{
[self.promise cancelWithReason:reason]; // doesn't hurt if the promise was the origin of
// the cancel message.
if (_isCancelled || _isFinished) {
return;
}
self.isCancelled = YES;
if (_isExecuting) {
[self finish];
}
}
@end
*/
//
// RXStreamToStreamCopier.h
//
// Copyright 2013 Andreas Grosam
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#import <Foundation/Foundation.h>
#import "RXAsynchronousOperation.h"
@protocol RXStreamCopierProtocol <RXAsynchronousOperation>
/**
Returns the total number of bytes copied to the destination stream
*/
@property (atomic, readonly) long long totalBytesCopied;
@end
/**
A RXStreamToStreamCopier asynchronously copies the content of a source stream into
a destination stream.
The streams will be scheduled on a run loop.
If the operation succeeds the promise will be resolved with the total bytes
copied (a NSNumber with an underlying long long).
RXStreamToStreamCopier implements protocol RXStreamCopierProtocol
*/
@interface RXStreamToStreamCopier : NSObject <RXStreamCopierProtocol>
/**
Designated Initializer.
Initializes a RXStreamToStreamCopier object.
Parameter `sourceStream` and `destinationStream` must not be nil, and both
streams shall be in a valid state.
The input stream shall be unopend. The receiver sets itself as the NSStreamDelegate
and opens the input stream. When finished, it close the stream and removes it from
the run loop.
The output stream may be opened already. The receiver sets itself as the
delegate of the output stream, opens it when necessary and schedules it on the
run loop. When finished, the receiver removes it from the run loop, but does not
close the stream.
@param sourceStream A `NSInputStream` object whose content shall be copied.
@param destinationSream A `NSOutputStream` object which is the target of the
*/
- (id) initWithSourceStream:(NSInputStream*)sourceStream
destinationStream:(NSOutputStream*)destinationStream;
/**
Returns or sets the worker thread where the actions are scheduled.
If not explicitly set, the workerThread becomes where start has been executed.
Setting the worker thread after the operation has been started has no effect.
*/
@property (nonatomic) NSThread* workerThread;
/**
Returns or sets the run loop mode where the actions are scheduled.
If not explicitly set, the run loop becomes NSDefaultRunLoopMode.
Setting the run loop mode after the operation has been started has no effect.
*/
@property (nonatomic, copy) NSString* runLoopMode;
/**
Sets or returns the size in bytes of the internal buffer.
The default size equals 64*1024.
Setting the buffer size after the operation has been started has no effect.
*/
@property (nonatomic) NSUInteger bufferSize;
/** Stream Copier Protocol
Returns the total number of bytes copied to the destination stream
@property (atomic, readonly) long long totalBytesCopied;
*/
@end
//
// RXStreamToStreamCopier.m
//
// Copyright 2013 Andreas Grosam
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#if ! __has_feature(objc_arc)
#error Automatic Reference Counting must be on
#endif
#import "RXStreamToStreamCopier.h"
//#define DEBUG_LOG_MIN 4 //enable this in order to debug verbosely
#import "Relax/utility/DLog.h"
static const int DefaultBufferSize = 64*1024;
@interface RXStreamToStreamCopier () <NSStreamDelegate>
@property (nonatomic, readwrite) BOOL isCancelled;
@property (nonatomic, readwrite) BOOL isExecuting;
@property (nonatomic, readwrite) BOOL isFinished;
@property (nonatomic, readwrite) RXPromise* promise;
@property (nonatomic) NSInputStream* sourceStream;
@property (nonatomic) NSOutputStream* destinationStream;
@property (atomic, readwrite) long long totalBytesCopied;
@end
@implementation RXStreamToStreamCopier {
id _self; // used to create a circular reference to itself in order to prevent premature deallocation
RXPromise* _promise;
BOOL _isCancelled;
BOOL _isExecuting;
BOOL _isFinished;
NSThread* _workerThread;
NSString* _runLoopMode;
NSUInteger _bufferSize;
uint8_t* _buffer;
uint8_t* _buffer_end;
uint8_t* _p; // "put-pointer" into buffer
uint8_t* _g; // "get-pointer" into buffer
long long _totalBytesCopied;
BOOL _inputEOF;
BOOL _hasBytesAvailableFlag;
BOOL _hasSpaceAvailableFlag;
}
@synthesize promise = _promise;
@synthesize isCancelled = _isCancelled;
@synthesize isExecuting = _isExecuting;
@synthesize isFinished = _isFinished;
@synthesize workerThread = _workerThread;
@synthesize runLoopMode = _runLoopMode;
@synthesize sourceStream = _srcStream;
@synthesize destinationStream =_dstStream;
@synthesize bufferSize = _bufferSize;
@synthesize totalBytesCopied = _totalBytesCopied;
- (id) initWithSourceStream:(NSInputStream*)sourceStream
destinationStream:(NSOutputStream*)destinationStream
{
NSParameterAssert(sourceStream && [sourceStream streamStatus] <= NSStreamStatusOpen);
NSParameterAssert(destinationStream && [destinationStream streamStatus] <= NSStreamStatusOpen);
self = [super init];
if (self) {
_srcStream = sourceStream;
_dstStream = destinationStream;
_totalBytesCopied = 0;
_bufferSize = DefaultBufferSize;
_runLoopMode = NSDefaultRunLoopMode;
_promise = [[RXPromise alloc] init];
_promise.then(nil, ^id(NSError* error){
if (_promise.isCancelled) {
[self cancelWithReason:error sender:nil];
}
return error;
});
}
return self;
}
- (void) dealloc {
DLogInfo(@"dealloc async object %@", self);
free(_buffer);
}
#pragma mark -
- (void) setRunLoopMode:(NSString *)runLoopMode {
if (_isCancelled || _isExecuting) {
return;
}
_runLoopMode = runLoopMode;
}
- (void) setWorkerThread:(NSThread *)workerThread {
if (_isCancelled || _isExecuting) {
return;
}
_workerThread = workerThread;
}
- (void) setBufferSize:(NSUInteger)bufferSize {
if (_isCancelled || _isExecuting) {
return;
}
_bufferSize = bufferSize;
}
#pragma mark
- (void) start
{
if (_isCancelled || _isExecuting) {
return;
}
NSAssert(_srcStream, @"source stream is nil");
NSAssert(_dstStream, @"destination stream is nil");
if (_workerThread == nil) {
_workerThread = [NSThread currentThread];
}
if (_workerThread == nil || [NSThread currentThread] == self.workerThread) {
_workerThread = [NSThread currentThread];
_self = self;
self.isExecuting = YES;
self.sourceStream.delegate = self;
[self.sourceStream scheduleInRunLoop:[NSRunLoop currentRunLoop] forMode:self.runLoopMode];
[self.sourceStream open];
self.destinationStream.delegate = self;
[self.destinationStream scheduleInRunLoop:[NSRunLoop currentRunLoop] forMode:self.runLoopMode];
NSStreamStatus ostreamStatus = self.destinationStream.streamStatus;
if (ostreamStatus == NSStreamStatusOpen) {
_hasSpaceAvailableFlag = self.destinationStream.hasSpaceAvailable;
}
if (ostreamStatus == NSStreamStatusNotOpen) {
[self.destinationStream open];
}
_buffer = (uint8_t*)malloc(_bufferSize);
_p = _buffer;
_g = _buffer;
_buffer_end = _buffer + _bufferSize;
}
else {
[self performSelector:@selector(start)
onThread:self.workerThread
withObject:nil waitUntilDone:NO
modes:[NSArray arrayWithObject:self.runLoopMode]];
}
}
- (void) finish
{
NSAssert([NSThread currentThread] == _workerThread, @"not executing on worker thread");
assert(_promise.isPending == NO);
if (!_isExecuting) {
return;
}
DLogInfo(@"finished");
[self.sourceStream close];
[self.sourceStream removeFromRunLoop:[NSRunLoop currentRunLoop] forMode:self.runLoopMode];
self.sourceStream.delegate = nil;
// do not close destination stream
[self.destinationStream removeFromRunLoop:[NSRunLoop currentRunLoop] forMode:self.runLoopMode];
self.destinationStream.delegate = nil;
self.isExecuting = NO;
self.isFinished = YES;
_self = nil;
}
// pull bytes from source stream to buffer
- (void) pull {
if (_p == _buffer_end) {
return; // put area is empty
}
_hasBytesAvailableFlag = NO;
NSInteger amount = [_srcStream read:_p maxLength:(NSUInteger)(_buffer_end - _p)];
if (amount > 0) {
DLogInfo(@"copied %ld bytes from source stream to buffer", (long)amount);
_p += amount;
} else if (amount == 0) {
// Source stream reached EOF
} else {
[self.promise rejectWithReason:[_srcStream streamError]];
[self finish];
return;
}
}
// push bytes from buffer to destination stream
- (void) push {
if (_p == _g) {
return; // get area is empty
}
_hasSpaceAvailableFlag = NO;
NSInteger amount = [_dstStream write:_g maxLength:(NSUInteger)(_p - _g)];
if (amount > 0) {
DLogInfo(@"copied %ld bytes from buffer to destination stream", (long)amount);
_g += amount;
_totalBytesCopied += amount;
if (_inputEOF && _g == _p) {
[self.promise fulfillWithValue:[NSNumber numberWithLongLong:_totalBytesCopied]];
[self finish];
} else if (_g == _p) {
_g = _p = _buffer;
}
} else if (amount <= 0) {
// The following will be handled in the stream delegate with event NSStreamEventErrorOccurred
//[self.promise rejectWithReason:[_dstStream streamError]];
//[self finish];
return;
}
}
- (void)stream:(NSStream *)theStream handleEvent:(NSStreamEvent)streamEvent
{
if (_isCancelled) {
return;
}
switch (streamEvent) {
case NSStreamEventNone:
break;
case NSStreamEventOpenCompleted:
DLogInfo(@"%@ stream: open completed", theStream == _srcStream ? @"source" : @"destination");
if (theStream == _dstStream) {
// TODO: remove comment!
//_hasSpaceAvailableFlag = _dstStream.hasSpaceAvailable;
}
if (theStream == _srcStream) {
_hasBytesAvailableFlag = _srcStream.hasBytesAvailable;
}
break;
case NSStreamEventHasBytesAvailable:
DLogInfo(@"%@ stream: has bytes available", theStream == _srcStream ? @"source" : @"destination");
// if put area is not empty:
if (_p < _buffer_end) {
// transfer bytes from source stream to buffer
[self pull];
} else {
// The source-stream has bytes available but the put-area of the buffer
// is empty. Bytes must be transfered from buffer to the destination-stream
// first. Delay transfering bytes from the source-stream to the buffer and
// set _hasBytesAvailableFlag.
// Now, we won't get NSStreamEventHasBytesAvailable events anymore, unless
// we read again from _srcStream.
_hasBytesAvailableFlag = YES;
}
// Trigger a push if the destination stream can accept bytes and the get area is not empty:
// (That is, a push will be triggered for any number of bytes available in the buffer)
if (_hasSpaceAvailableFlag && _g != _p) {
[self push];
}
break;
case NSStreamEventHasSpaceAvailable:
DLogInfo(@"%@ stream: has space available", theStream == _srcStream ? @"source" : @"destination");
// if get area is not empty:
if (_g != _p) {
// transfer bytes from buffer to destination stream
[self push];
} else {
// The destination-stream has space available, but the get-area of the buffer
// is empty. Delay transfering bytes from buffer to the destination-stream
// and set _hasSpaceAvailableFlag.
// Now, we won't get NSStreamEventHasSpaceAvailable events anymore, unless
// we write again into _dstStream.
_hasSpaceAvailableFlag = YES;
}
// Trigger a pull if the source has bytes available and the get-area is empty
// (rather than when the put area is not full in order to reduce context switches.
// That means, we only pull again after the destination has gotten all bytes in the buffer):
if (_hasBytesAvailableFlag && _p == _g) {
[self pull];
}
break;
case NSStreamEventErrorOccurred:
DLogInfo(@"%@ stream: error occurred", theStream == _srcStream ? @"source" : @"destination");
[self.promise rejectWithReason:[theStream streamError]];
[self finish];
break;
case NSStreamEventEndEncountered:
DLogInfo(@"%@ stream: EOF encountered", theStream == _srcStream ? @"source" : @"destination");
if (theStream == _srcStream) {
_inputEOF = YES;
if (_g == _p) {
// write buffer is empty - finished:
[self.promise fulfillWithValue:[NSNumber numberWithLongLong:_totalBytesCopied]];
[self finish];
}
} else {
// weird error: the output stream is full or closed prematurely, or canceled.
NSError* err = [self makeErrorWithDomain:@"RXStreamToStreamCopier" code:-2 description:@"output stream EOF encountered"];
[self.promise rejectWithReason:err];
[self finish];
}
break;
}
}
- (NSError*) makeErrorWithDomain:(NSString*)errorDomain code:(NSInteger)code
description:(NSString*)description
{
NSError* error = [NSError errorWithDomain:errorDomain
code:code
userInfo:[NSDictionary dictionaryWithObjectsAndKeys:
description, NSLocalizedDescriptionKey,
nil]];
return error;
}
#pragma mark -
- (void) cancel {
[self cancelWithReason:@"unknown" sender:nil];
}
- (void) cancelWithReason:(id)reason sender:(id)sender {
[self.promise cancelWithReason:reason]; // doesn't hurt if the promise was the origin of the cancel message.
if (_workerThread == nil || ([NSThread currentThread] == _workerThread)) {
if (_isCancelled || _isFinished) {
return;
}
self.isCancelled = YES;
if (_isExecuting) {
[self finish];
}
}
else {
// Never access a scheduled stream from any other than the run loop's thread:
[self performSelector:@selector(cancel)
onThread:self.workerThread
withObject:nil waitUntilDone:NO
modes:@[self.runLoopMode]];
}
}
@end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment