Last active
January 11, 2025 11:54
-
-
Save danielt1263/ab32f91a27c8c8e6d2d0fa0aae6afa5c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// ThrottleDebounceLatest.swift | |
// | |
// Created by Daniel Tartaglia on 14 Oct 2023. | |
// Copyright © 2025 Daniel Tartaglia. MIT License. | |
// | |
import Foundation | |
import RxSwift | |
extension ObservableType { | |
/** | |
returns an Observable that emits the first item emitted by the source Observable then ignores elements from the | |
source which are followed by another element within a specified relative time duration, using the specified | |
scheduler to run throttling timers. | |
- parameter dueTime: Throttling duration for each element. | |
- parameter scheduler: Scheduler to run the throttle timers on. | |
- returns: The throttled sequence. | |
*/ | |
func throttleDebounceLatest(dueTime: RxTimeInterval, scheduler: SchedulerType) -> Observable<Element> { | |
Observable.create { observer in | |
var lastFire = RxTime?.none | |
var nextEmit = (Event<Element>, Disposable)?.none | |
let lock = NSRecursiveLock() | |
func delay(event: Event<Self.Element>) -> Disposable { | |
scheduler.scheduleRelative((), dueTime: dueTime) { | |
lock.lock(); defer { lock.unlock() } | |
observer.on(event) | |
nextEmit = nil | |
return Disposables.create() | |
} | |
} | |
return self.subscribe { event in | |
lock.lock(); defer { lock.unlock() } | |
switch event { | |
case .next: | |
if lastFire == nil || dueTime.asTimeInterval < scheduler.now.timeIntervalSince(lastFire!) { | |
observer.on(event) | |
} else { | |
nextEmit?.1.dispose() | |
nextEmit = (event, delay(event: event)) | |
} | |
lastFire = scheduler.now | |
case .error: | |
nextEmit?.1.dispose() | |
observer.on(event) | |
case .completed: | |
if let nextEmit { | |
nextEmit.1.dispose() | |
observer.on(nextEmit.0) | |
observer.on(event) | |
} else { | |
observer.on(event) | |
} | |
} | |
} | |
} | |
} | |
} | |
private extension DispatchTimeInterval { | |
var asTimeInterval: TimeInterval { | |
switch self { | |
case .nanoseconds(let val): return Double(val) / 1_000_000_000.0 | |
case .microseconds(let val): return Double(val) / 1_000_000.0 | |
case .milliseconds(let val): return Double(val) / 1_000.0 | |
case .seconds(let val): return Double(val) | |
case .never: return Double.infinity | |
@unknown default: fatalError() | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// ThrottleDebounceLatestTests.swift | |
// | |
// Created by Daniel Tartaglia on 14 Oct 2023. | |
// Copyright © 2025 Daniel Tartaglia. MIT License. | |
// | |
import RxTest | |
import XCTest | |
final class ThrottleDebounceTests: XCTestCase { | |
func test() { | |
let scheduler = TestScheduler(initialClock: 0) | |
let source = scheduler.createObservable(timeline: "-A-B-C---D-|") | |
let expected = parseTimeline( "-A-----C-D-|") | |
.offsetTime(by: 200) | |
let actual = scheduler.start { | |
source.throttleDebounceLatest(dueTime: .seconds(2), scheduler: scheduler) | |
} | |
XCTAssertEqual(actual.events, expected[0]) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
func throttleDebounceLatest(dueTime: RxTimeInterval, scheduler: SchedulerType) -> Observable<Element> { // Using a
ReplaySubject` to manage the latest eventlet latestEvent = ReplaySubject.create(bufferSize: 1)
let isCompleted = Atomic(false) // Atomic flag to track completion
}
// Atomic wrapper for thread-safe value manipulation
final class Atomic {
private var lock = DispatchQueue(label: "com.atomic.lock")
private var _value: T
}`