Last active
August 29, 2015 14:15
-
-
Save fpillet/6ccd2c79a0eb3ebae796 to your computer and use it in GitHub Desktop.
Two RACSignal operations I use that are not in the main ReactiveCocoa distribution
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Created by Florent Pillet on 30/01/15. | |
// | |
#import <Foundation/Foundation.h> | |
@interface RACSignal (FPOperations) | |
/// Delivers the receiver's latest `next`s with a minimum of `interval` | |
/// seconds between two values. Of `next` values produced by the receiver | |
/// in a period of `interval` seconds, only the last one is kept and sent | |
/// by the return signal; intermediate values are discarded. | |
/// | |
/// interval - The interval in which values are grouped into one buffer. | |
/// scheduler - The scheduler upon which the returned signal will deliver its | |
/// values. This must not be nil or +[RACScheduler | |
/// immediateScheduler]. | |
/// | |
/// Returns a signal which sends the latest value at (at least) each interval on `scheduler`. | |
/// When the receiver completes, any unsent latest value will be sent immediately before | |
/// the signal completes. | |
- (RACSignal *)sampleAtInterval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler; | |
/// Resubscribes to the receiver when it completes, after waiting for the specified delay. | |
/// This is the equivalent of the standard -repeat operation, with the addition of a delay to | |
/// avoid fast cycles for signals that complete quickly. | |
/// | |
/// delay - The delay after which the receiver is resubscribed to, on the scheduler that is | |
/// current at the time the receiver completes / errors. | |
/// | |
/// The returned signal will pass all `next' to its subscribers. On `error' it will error too, | |
/// effectively breaking the re-subscription cycle. | |
- (RACSignal *)repeatAfter:(NSTimeInterval)delay; | |
@end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Created by Florent Pillet on 30/01/15. | |
// | |
#import "RACSignal+FPOperations.h" | |
@implementation RACSignal (FPOperations) | |
// Code extracted from ReactiveCococa and modified to suit my needs | |
// Subscribes to the given signal with the given blocks. | |
// | |
// If the signal errors or completes, the corresponding block is invoked. If the | |
// disposable passed to the block is _not_ disposed, then the signal is | |
// subscribed to again. | |
static RACDisposable *subscribeForever (RACSignal *signal, NSTimeInterval delay, void (^next)(id), void (^error)(NSError *, RACDisposable *), void (^completed)(RACDisposable *)) { | |
next = [next copy]; | |
error = [error copy]; | |
completed = [completed copy]; | |
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable]; | |
RACSchedulerRecursiveBlock recursiveBlock = ^(void (^recurse)(void)) { | |
RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable]; | |
[compoundDisposable addDisposable:selfDisposable]; | |
__weak RACDisposable *weakSelfDisposable = selfDisposable; | |
RACDisposable *subscriptionDisposable = [signal subscribeNext:next error:^(NSError *e) { | |
@autoreleasepool { | |
error(e, compoundDisposable); | |
[compoundDisposable removeDisposable:weakSelfDisposable]; | |
} | |
[RACScheduler.currentScheduler afterDelay:delay schedule:recurse]; | |
} completed:^{ | |
@autoreleasepool { | |
completed(compoundDisposable); | |
[compoundDisposable removeDisposable:weakSelfDisposable]; | |
} | |
[RACScheduler.currentScheduler afterDelay:delay schedule:recurse]; | |
}]; | |
[selfDisposable addDisposable:subscriptionDisposable]; | |
}; | |
// Subscribe once immediately, and then use recursive scheduling for any | |
// further resubscriptions. | |
recursiveBlock(^{ | |
RACScheduler *recursiveScheduler = RACScheduler.currentScheduler ?: [RACScheduler scheduler]; | |
RACDisposable *schedulingDisposable = [recursiveScheduler scheduleRecursiveBlock:recursiveBlock]; | |
[compoundDisposable addDisposable:schedulingDisposable]; | |
}); | |
return compoundDisposable; | |
} | |
- (RACSignal *)sampleAtInterval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler { | |
NSCParameterAssert(scheduler != nil); | |
NSCParameterAssert(scheduler != RACScheduler.immediateScheduler); | |
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) { | |
RACSerialDisposable *timerDisposable = [[RACSerialDisposable alloc] init]; | |
__block id latestValue = nil; | |
void (^sendLatest)() = ^{ | |
@synchronized (timerDisposable) { | |
[timerDisposable.disposable dispose]; | |
timerDisposable.disposable = nil; | |
if (latestValue) { | |
[subscriber sendNext:latestValue]; | |
latestValue = nil; | |
} | |
} | |
}; | |
RACDisposable *selfDisposable = [self subscribeNext:^(id x) { | |
@synchronized (timerDisposable) { | |
if (timerDisposable.disposable != nil) | |
latestValue = x; | |
else { | |
[subscriber sendNext:x]; | |
timerDisposable.disposable = [scheduler afterDelay:interval schedule:sendLatest]; | |
} | |
} | |
} error:^(NSError *error) { | |
[subscriber sendError:error]; | |
} completed:^{ | |
sendLatest(); | |
[subscriber sendCompleted]; | |
}]; | |
return [RACDisposable disposableWithBlock:^{ | |
[selfDisposable dispose]; | |
[timerDisposable dispose]; | |
}]; | |
}] setNameWithFormat:@"[%@] -sampleAtInterval: %f onScheduler: %@", self.name, (double)interval, scheduler]; | |
} | |
- (RACSignal *)repeatAfter:(NSTimeInterval)delay | |
{ | |
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) { | |
return subscribeForever(self, delay, | |
^(id x) { | |
[subscriber sendNext:x]; | |
}, | |
^(NSError *error, RACDisposable *disposable) { | |
[disposable dispose]; | |
[subscriber sendError:error]; | |
}, | |
^(RACDisposable *disposable) { | |
// Resubscribe. | |
}); | |
}] setNameWithFormat:@"[%@] -repeatAfter: %f", self.name, delay]; | |
} | |
@end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment