Skip to content

Instantly share code, notes, and snippets.

@danielt1263
Last active January 11, 2025 11:54
Show Gist options
  • Save danielt1263/ab32f91a27c8c8e6d2d0fa0aae6afa5c to your computer and use it in GitHub Desktop.
Save danielt1263/ab32f91a27c8c8e6d2d0fa0aae6afa5c to your computer and use it in GitHub Desktop.
//
// ThrottleDebounceLatest.swift
//
// Created by Daniel Tartaglia on 14 Oct 2023.
// Copyright © 2025 Daniel Tartaglia. MIT License.
//
import Foundation
import RxSwift
extension ObservableType {
/**
returns an Observable that emits the first item emitted by the source Observable then ignores elements from the
source which are followed by another element within a specified relative time duration, using the specified
scheduler to run throttling timers.
- parameter dueTime: Throttling duration for each element.
- parameter scheduler: Scheduler to run the throttle timers on.
- returns: The throttled sequence.
*/
func throttleDebounceLatest(dueTime: RxTimeInterval, scheduler: SchedulerType) -> Observable<Element> {
Observable.create { observer in
var lastFire = RxTime?.none
var nextEmit = (Event<Element>, Disposable)?.none
let lock = NSRecursiveLock()
func delay(event: Event<Self.Element>) -> Disposable {
scheduler.scheduleRelative((), dueTime: dueTime) {
lock.lock(); defer { lock.unlock() }
observer.on(event)
nextEmit = nil
return Disposables.create()
}
}
return self.subscribe { event in
lock.lock(); defer { lock.unlock() }
switch event {
case .next:
if lastFire == nil || dueTime.asTimeInterval < scheduler.now.timeIntervalSince(lastFire!) {
observer.on(event)
} else {
nextEmit?.1.dispose()
nextEmit = (event, delay(event: event))
}
lastFire = scheduler.now
case .error:
nextEmit?.1.dispose()
observer.on(event)
case .completed:
if let nextEmit {
nextEmit.1.dispose()
observer.on(nextEmit.0)
observer.on(event)
} else {
observer.on(event)
}
}
}
}
}
}
private extension DispatchTimeInterval {
var asTimeInterval: TimeInterval {
switch self {
case .nanoseconds(let val): return Double(val) / 1_000_000_000.0
case .microseconds(let val): return Double(val) / 1_000_000.0
case .milliseconds(let val): return Double(val) / 1_000.0
case .seconds(let val): return Double(val)
case .never: return Double.infinity
@unknown default: fatalError()
}
}
}
//
// ThrottleDebounceLatestTests.swift
//
// Created by Daniel Tartaglia on 14 Oct 2023.
// Copyright © 2025 Daniel Tartaglia. MIT License.
//
import RxTest
import XCTest
final class ThrottleDebounceTests: XCTestCase {
func test() {
let scheduler = TestScheduler(initialClock: 0)
let source = scheduler.createObservable(timeline: "-A-B-C---D-|")
let expected = parseTimeline( "-A-----C-D-|")
.offsetTime(by: 200)
let actual = scheduler.start {
source.throttleDebounceLatest(dueTime: .seconds(2), scheduler: scheduler)
}
XCTAssertEqual(actual.events, expected[0])
}
}
@chance395
Copy link

func throttleDebounceLatest(dueTime: RxTimeInterval, scheduler: SchedulerType) -> Observable<Element> { // Using aReplaySubject` to manage the latest event
let latestEvent = ReplaySubject.create(bufferSize: 1)
let isCompleted = Atomic(false) // Atomic flag to track completion

    return self
        .do(onNext: { event in
            guard !isCompleted.value else { return }
            latestEvent.onNext(event)
        }, onError: { error in
            guard !isCompleted.value else { return }
            latestEvent.onError(error)
            isCompleted.value = true
        }, onCompleted: {
            guard !isCompleted.value else { return }
            latestEvent.onCompleted()
            isCompleted.value = true
        })
        .throttle(dueTime, scheduler: scheduler) // Emit first event immediately
        .flatMapLatest { _ in
            latestEvent.debounce(dueTime, scheduler: scheduler) // Emit the latest event after debounce interval
        }
}

}

// Atomic wrapper for thread-safe value manipulation
final class Atomic {
private var lock = DispatchQueue(label: "com.atomic.lock")
private var _value: T

init(_ value: T) {
    self._value = value
}

var value: T {
    get {
        return lock.sync { _value }
    }
    set {
        lock.sync { _value = newValue }
    }
}

}`

@danielt1263
Copy link
Author

@chance395 Your operator doesn't have the same behavior.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment