Created
October 6, 2018 02:39
-
-
Save toshi0383/bf567dfa62389f8356b13844fa15b2ed to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
import RxSwift | |
extension ObservableType { | |
/** | |
Returns an Observable that emits the first and the latest item emitted by the source Observable during sequential time windows of a specified duration. | |
This operator makes sure that no two elements are emitted in less then each consulted dueTime. | |
- seealso: [debounce operator on reactivex.io](http://reactivex.io/documentation/operators/debounce.html) | |
- parameter dueTime: Throttling duration for each element. Consulted after each next event for next throttle. | |
- parameter until: dueTime interval resets to 0 at each next event of this stream. | |
- parameter latest: Should latest element received in a dueTime wide time window since last element emission be emitted. | |
- parameter scheduler: Scheduler to run the throttle timers on. | |
- returns: The throttled sequence. | |
*/ | |
public func throttle<O: ObservableType>(dueTime: @escaping (E, RxTimeInterval) -> RxTimeInterval, until: O, latest: Bool = false, scheduler: SchedulerType) | |
-> Observable<E> { | |
return Throttle2(source: self.asObservable(), | |
dueTime: dueTime, | |
until: until.asObservable(), | |
latest: latest, | |
scheduler: scheduler) | |
} | |
} | |
final fileprivate class ThrottleSink2<O: ObserverType, UntilElement> | |
: Sink<O> | |
, ObserverType | |
, LockOwnerType | |
, SynchronizedOnType { | |
typealias Element = O.E | |
typealias ParentType = Throttle2<Element, UntilElement> | |
private let _parent: ParentType | |
let _lock = RecursiveLock() | |
// state | |
private var _lastUnsentElement: Element? = nil | |
private var _lastSentTime: Date? = nil | |
private var _completed: Bool = false | |
private var _currentDueTime: RxTimeInterval = 0 | |
let cancellable = SerialDisposable() | |
init(parent: ParentType, observer: O, cancel: Cancelable) { | |
_parent = parent | |
super.init(observer: observer, cancel: cancel) | |
} | |
func run() -> Disposable { | |
let untilSubscription = _parent._until.subscribe(onNext: { [weak self] _ in | |
self?.cancellable.disposable.dispose() | |
self?._currentDueTime = 0 | |
self?._lastSentTime = nil | |
}) | |
let subscription = _parent._source.subscribe(self) | |
return Disposables.create(subscription, untilSubscription, cancellable) | |
} | |
func on(_ event: Event<Element>) { | |
synchronizedOn(event) | |
} | |
func _synchronized_on(_ event: Event<Element>) { | |
switch event { | |
case .next(let element): | |
let now = _parent._scheduler.now | |
let timeIntervalSinceLast: RxTimeInterval | |
if let lastSendingTime = _lastSentTime { | |
timeIntervalSinceLast = now.timeIntervalSince(lastSendingTime) | |
} | |
else { | |
timeIntervalSinceLast = _currentDueTime | |
} | |
let couldSendNow = timeIntervalSinceLast >= _currentDueTime | |
if couldSendNow { | |
_currentDueTime = _parent._dueTime(element, _currentDueTime) | |
self.sendNow(element: element) | |
return | |
} | |
if !_parent._latest { | |
return | |
} | |
let isThereAlreadyInFlightRequest = _lastUnsentElement != nil | |
_lastUnsentElement = element | |
if isThereAlreadyInFlightRequest { | |
return | |
} | |
let scheduler = _parent._scheduler | |
let d = SingleAssignmentDisposable() | |
self.cancellable.disposable = d | |
d.setDisposable(scheduler.scheduleRelative(0, dueTime: _currentDueTime - timeIntervalSinceLast, action: self.propagate)) | |
case .error: | |
_lastUnsentElement = nil | |
forwardOn(event) | |
dispose() | |
case .completed: | |
if let _ = _lastUnsentElement { | |
_completed = true | |
} | |
else { | |
forwardOn(.completed) | |
dispose() | |
} | |
} | |
} | |
private func sendNow(element: Element) { | |
_lastUnsentElement = nil | |
self.forwardOn(.next(element)) | |
// in case element processing takes a while, this should give some more room | |
_lastSentTime = _parent._scheduler.now | |
} | |
func propagate(_: Int) -> Disposable { | |
_lock.lock(); defer { _lock.unlock() } // { | |
if let lastUnsentElement = _lastUnsentElement { | |
sendNow(element: lastUnsentElement) | |
} | |
if _completed { | |
forwardOn(.completed) | |
dispose() | |
} | |
// } | |
return Disposables.create() | |
} | |
} | |
final fileprivate class Throttle2<Element, UntilElement> : Producer<Element> { | |
fileprivate let _source: Observable<Element> | |
fileprivate let _dueTime: (Element, RxTimeInterval) -> RxTimeInterval | |
fileprivate let _until: Observable<UntilElement> | |
fileprivate let _latest: Bool | |
fileprivate let _scheduler: SchedulerType | |
init(source: Observable<Element>, dueTime: @escaping (Element, RxTimeInterval) -> RxTimeInterval, until: Observable<UntilElement>, latest: Bool, scheduler: SchedulerType) { | |
_source = source | |
_dueTime = dueTime | |
_until = until | |
_latest = latest | |
_scheduler = scheduler | |
} | |
override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element { | |
let sink = ThrottleSink2(parent: self, observer: observer, cancel: cancel) | |
let subscription = sink.run() | |
return (sink: sink, subscription: subscription) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment