Skip to content

Instantly share code, notes, and snippets.

@danielt1263
Last active January 11, 2025 11:54
Show Gist options
  • Save danielt1263/ab32f91a27c8c8e6d2d0fa0aae6afa5c to your computer and use it in GitHub Desktop.
Save danielt1263/ab32f91a27c8c8e6d2d0fa0aae6afa5c to your computer and use it in GitHub Desktop.
//
// ThrottleDebounceLatest.swift
//
// Created by Daniel Tartaglia on 14 Oct 2023.
// Copyright © 2025 Daniel Tartaglia. MIT License.
//
import Foundation
import RxSwift
extension ObservableType {
/**
returns an Observable that emits the first item emitted by the source Observable then ignores elements from the
source which are followed by another element within a specified relative time duration, using the specified
scheduler to run throttling timers.
- parameter dueTime: Throttling duration for each element.
- parameter scheduler: Scheduler to run the throttle timers on.
- returns: The throttled sequence.
*/
func throttleDebounceLatest(dueTime: RxTimeInterval, scheduler: SchedulerType) -> Observable<Element> {
Observable.create { observer in
var lastFire = RxTime?.none
var nextEmit = (Event<Element>, Disposable)?.none
let lock = NSRecursiveLock()
func delay(event: Event<Self.Element>) -> Disposable {
scheduler.scheduleRelative((), dueTime: dueTime) {
lock.lock(); defer { lock.unlock() }
observer.on(event)
nextEmit = nil
return Disposables.create()
}
}
return self.subscribe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case .next:
if lastFire == nil || dueTime.asTimeInterval < scheduler.now.timeIntervalSince(lastFire!) {
observer.on(event)
} else {
nextEmit?.1.dispose()
nextEmit = (event, delay(event: event))
}
lastFire = scheduler.now
case .error:
nextEmit?.1.dispose()
observer.on(event)
case .completed:
if let nextEmit {
nextEmit.1.dispose()
observer.on(nextEmit.0)
observer.on(event)
} else {
observer.on(event)
}
}
}
}
}
}
private extension DispatchTimeInterval {
var asTimeInterval: TimeInterval {
switch self {
case .nanoseconds(let val): return Double(val) / 1_000_000_000.0
case .microseconds(let val): return Double(val) / 1_000_000.0
case .milliseconds(let val): return Double(val) / 1_000.0
case .seconds(let val): return Double(val)
case .never: return Double.infinity
@unknown default: fatalError()
}
}
}
//
// ThrottleDebounceLatestTests.swift
//
// Created by Daniel Tartaglia on 14 Oct 2023.
// Copyright © 2025 Daniel Tartaglia. MIT License.
//
import RxTest
import XCTest
final class ThrottleDebounceTests: XCTestCase {
func test() {
let scheduler = TestScheduler(initialClock: 0)
let source = scheduler.createObservable(timeline: "-A-B-C---D-|")
let expected = parseTimeline( "-A-----C-D-|")
.offsetTime(by: 200)
let actual = scheduler.start {
source.throttleDebounceLatest(dueTime: .seconds(2), scheduler: scheduler)
}
XCTAssertEqual(actual.events, expected[0])
}
}
@danielt1263
Copy link
Author

@chance395 Your operator doesn't have the same behavior.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment