Created
December 5, 2017 21:24
-
-
Save Vladlex/ea02dba926feeb369edd621be424dc52 to your computer and use it in GitHub Desktop.
RxSwift Extension: Collect until no event comes in a given time interval
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public extension Observable { | |
/// Returns an observable which collects incoming events and fires if no new event comes in a desired time | |
/// | |
/// - Parameters: | |
/// - delay: Value for debouncing (interval that should being passed, a) | |
/// - bufferMutator: block to mutate collected events. If nil – then new evens just being added to buffer. | |
/// - Returns: an observable which collects incoming events and fires if no new event comes in a desired time. | |
public func collect(untilNoEventComesIn delay: RxTimeInterval, bufferMutator:((inout [E], E)->())? = nil) -> Observable<[E]> { | |
return Observable<[E]>.create({ observable in | |
var buffer: [E] = [] | |
let lock = NSRecursiveLock() | |
let debouncer = PublishSubject<Void>.init() | |
let debouncerDisposable = debouncer.debounce(delay, scheduler: MainScheduler.asyncInstance).subscribe({ event in | |
lock.lock() | |
defer { | |
lock.unlock() | |
} | |
switch event { | |
case .next(_): | |
let collected = buffer | |
guard !collected.isEmpty else { | |
return | |
} | |
buffer.removeAll() | |
observable.onNext(collected) | |
case .completed: | |
observable.onCompleted() | |
case .error(let error): | |
observable.onError(error) | |
} | |
}) | |
let disposable = self.subscribe { (event) in | |
lock.lock() | |
defer { | |
lock.unlock() | |
} | |
switch event { | |
case .next(let element): | |
if let mutator = bufferMutator { | |
mutator(&buffer, element) | |
} | |
else { | |
buffer.append(element) | |
} | |
debouncer.onNext() | |
case .completed: | |
debouncer.onCompleted() | |
case .error(let error): | |
debouncer.onError(error) | |
} | |
} | |
return Disposables.create([disposable, debouncerDisposable]) | |
}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment