Skip to content

Instantly share code, notes, and snippets.

@danielt1263
Last active June 20, 2023 10:51
Show Gist options
  • Save danielt1263/fbfa3b56614b474f39862fc293a305f3 to your computer and use it in GitHub Desktop.
Save danielt1263/fbfa3b56614b474f39862fc293a305f3 to your computer and use it in GitHub Desktop.
I was asked to create a shared cache.
//
// SharedCache+Rx.swift
//
// Created by Daniel Tartaglia on 12 Jun 2021.
// Copyright © 2023 Daniel Tartaglia. MIT License.
//
import RxSwift
extension ObservableType {
/**
Shares the source across multiple subscriptions. It will store the shared resource until keepTime after the source
completes. A subscription that comes after that will cause a resubscribe to the source. If the source emits an
error, the cache will be cleared.
- Parameters:
- keepTime: How long to keep the shared stream after it completes.
- scheduler: Scheduler used to dispose the cached stream at the right time.
- Returns: The shared sequence.
*/
func sharedCache(keepTime: RxTimeInterval, scheduler: SchedulerType) -> Observable<Element> {
var cache: ReplaySubject<Element>?
let lock = NSRecursiveLock()
var scheduleDisposable: Disposable?
return Observable.create { observer in
lock.lock(); defer { lock.unlock() }
if let cache {
return cache.subscribe(observer)
} else {
cache = ReplaySubject<Element>.createUnbounded()
_ = self.subscribe(
onNext: { element in
lock.lock(); defer { lock.unlock() }
cache!.onNext(element)
},
onError: { error in
lock.lock(); defer { lock.unlock() }
scheduleDisposable?.dispose()
cache!.onError(error)
cache = nil
},
onCompleted: {
lock.lock(); defer { lock.unlock() }
cache!.onCompleted()
scheduleDisposable?.dispose()
scheduleDisposable = scheduler.scheduleRelative((), dueTime: keepTime, action: { _ in
lock.lock(); defer { lock.unlock() }
cache = nil
return Disposables.create()
})
}
)
return cache!
.subscribe(observer)
}
}
}
}
class RxSandboxTests: XCTestCase {
func test() {
let scheduler = TestScheduler(initialClock: 0)
let source = scheduler.createObservable(timeline: "-X|")
let result = scheduler.start {
source.sharedCache(keepTime: .seconds(3), scheduler: scheduler)
}
let expected = parseEventsAndTimes(timeline: "-X|", values: { "\($0)" })
.offsetTime(by: 200)
XCTAssertEqual(result.events, expected[0])
}
func test1() {
let scheduler = TestScheduler(initialClock: 0)
let source = scheduler.createObservable(timeline: "-X|-Y|")
let obs1 = scheduler.createObserver(String.self)
let obs2 = scheduler.createObserver(String.self)
let data = source
.sharedCache(keepTime: .seconds(3), scheduler: scheduler)
_ = data
.subscribe(obs1)
scheduler.scheduleAt(2) {
_ = data.subscribe(obs2)
}
scheduler.start()
let expected = parseEventsAndTimes(timeline: "-X|--X|", values: { "\($0)" })
XCTAssertEqual(obs1.events, expected[0])
XCTAssertEqual(obs2.events, expected[1])
}
func test2() {
let scheduler = TestScheduler(initialClock: 0)
let source = scheduler.createObservable(timeline: "-X|-Y|")
let obs1 = scheduler.createObserver(String.self)
let obs2 = scheduler.createObserver(String.self)
let data = source
.sharedCache(keepTime: .seconds(3), scheduler: scheduler)
_ = data
.subscribe(obs1)
scheduler.scheduleAt(5) {
_ = data.subscribe(obs2)
}
scheduler.start()
let expected = parseEventsAndTimes(timeline: "-X|------Y|", values: { "\($0)" })
XCTAssertEqual(obs1.events, expected[0])
XCTAssertEqual(obs2.events, expected[1])
}
func test3() {
let scheduler = TestScheduler(initialClock: 0)
let source = scheduler.createObservable(timeline: "-X-#-Y|")
let obs1 = scheduler.createObserver(String.self)
let obs2 = scheduler.createObserver(String.self)
let data = source
.sharedCache(keepTime: .seconds(4), scheduler: scheduler)
_ = data
.subscribe(obs1)
scheduler.scheduleAt(4) {
_ = data.subscribe(obs2)
}
scheduler.start()
let expected = parseEventsAndTimes(timeline: "-X-#-----Y|", values: { "\($0)" })
print("obs1", obs1.events)
print("obs2", obs2.events)
XCTAssertEqual(obs1.events, expected[0])
XCTAssertEqual(obs2.events, expected[1])
}
func test4() {
let scheduler = TestScheduler(initialClock: 0)
let source = scheduler.createObservable(timeline: "-A----BC|-Y|")
let obs1 = scheduler.createObserver(String.self)
let obs2 = scheduler.createObserver(String.self)
let data = source
.sharedCache(keepTime: .seconds(3), scheduler: scheduler)
_ = data
.subscribe(obs1)
scheduler.scheduleAt(2) {
_ = data.subscribe(obs2)
}
scheduler.start()
let expected = parseEventsAndTimes(timeline: "-A----BC|--A---BC|", values: { "\($0)" })
XCTAssertEqual(obs1.events, expected[0])
XCTAssertEqual(obs2.events, expected[1])
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment