Instantly share code, notes, and snippets.
Last active
June 20, 2023 10:51
-
Star
(2)
2
You must be signed in to star a gist -
Fork
(0)
0
You must be signed in to fork a gist
-
Save danielt1263/fbfa3b56614b474f39862fc293a305f3 to your computer and use it in GitHub Desktop.
I was asked to create a shared cache.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// SharedCache+Rx.swift | |
// | |
// Created by Daniel Tartaglia on 12 Jun 2021. | |
// Copyright © 2023 Daniel Tartaglia. MIT License. | |
// | |
import RxSwift | |
extension ObservableType { | |
/** | |
Shares the source across multiple subscriptions. It will store the shared resource until keepTime after the source | |
completes. A subscription that comes after that will cause a resubscribe to the source. If the source emits an | |
error, the cache will be cleared. | |
- Parameters: | |
- keepTime: How long to keep the shared stream after it completes. | |
- scheduler: Scheduler used to dispose the cached stream at the right time. | |
- Returns: The shared sequence. | |
*/ | |
func sharedCache(keepTime: RxTimeInterval, scheduler: SchedulerType) -> Observable<Element> { | |
var cache: ReplaySubject<Element>? | |
let lock = NSRecursiveLock() | |
var scheduleDisposable: Disposable? | |
return Observable.create { observer in | |
lock.lock(); defer { lock.unlock() } | |
if let cache { | |
return cache.subscribe(observer) | |
} else { | |
cache = ReplaySubject<Element>.createUnbounded() | |
_ = self.subscribe( | |
onNext: { element in | |
lock.lock(); defer { lock.unlock() } | |
cache!.onNext(element) | |
}, | |
onError: { error in | |
lock.lock(); defer { lock.unlock() } | |
scheduleDisposable?.dispose() | |
cache!.onError(error) | |
cache = nil | |
}, | |
onCompleted: { | |
lock.lock(); defer { lock.unlock() } | |
cache!.onCompleted() | |
scheduleDisposable?.dispose() | |
scheduleDisposable = scheduler.scheduleRelative((), dueTime: keepTime, action: { _ in | |
lock.lock(); defer { lock.unlock() } | |
cache = nil | |
return Disposables.create() | |
}) | |
} | |
) | |
return cache! | |
.subscribe(observer) | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class RxSandboxTests: XCTestCase { | |
func test() { | |
let scheduler = TestScheduler(initialClock: 0) | |
let source = scheduler.createObservable(timeline: "-X|") | |
let result = scheduler.start { | |
source.sharedCache(keepTime: .seconds(3), scheduler: scheduler) | |
} | |
let expected = parseEventsAndTimes(timeline: "-X|", values: { "\($0)" }) | |
.offsetTime(by: 200) | |
XCTAssertEqual(result.events, expected[0]) | |
} | |
func test1() { | |
let scheduler = TestScheduler(initialClock: 0) | |
let source = scheduler.createObservable(timeline: "-X|-Y|") | |
let obs1 = scheduler.createObserver(String.self) | |
let obs2 = scheduler.createObserver(String.self) | |
let data = source | |
.sharedCache(keepTime: .seconds(3), scheduler: scheduler) | |
_ = data | |
.subscribe(obs1) | |
scheduler.scheduleAt(2) { | |
_ = data.subscribe(obs2) | |
} | |
scheduler.start() | |
let expected = parseEventsAndTimes(timeline: "-X|--X|", values: { "\($0)" }) | |
XCTAssertEqual(obs1.events, expected[0]) | |
XCTAssertEqual(obs2.events, expected[1]) | |
} | |
func test2() { | |
let scheduler = TestScheduler(initialClock: 0) | |
let source = scheduler.createObservable(timeline: "-X|-Y|") | |
let obs1 = scheduler.createObserver(String.self) | |
let obs2 = scheduler.createObserver(String.self) | |
let data = source | |
.sharedCache(keepTime: .seconds(3), scheduler: scheduler) | |
_ = data | |
.subscribe(obs1) | |
scheduler.scheduleAt(5) { | |
_ = data.subscribe(obs2) | |
} | |
scheduler.start() | |
let expected = parseEventsAndTimes(timeline: "-X|------Y|", values: { "\($0)" }) | |
XCTAssertEqual(obs1.events, expected[0]) | |
XCTAssertEqual(obs2.events, expected[1]) | |
} | |
func test3() { | |
let scheduler = TestScheduler(initialClock: 0) | |
let source = scheduler.createObservable(timeline: "-X-#-Y|") | |
let obs1 = scheduler.createObserver(String.self) | |
let obs2 = scheduler.createObserver(String.self) | |
let data = source | |
.sharedCache(keepTime: .seconds(4), scheduler: scheduler) | |
_ = data | |
.subscribe(obs1) | |
scheduler.scheduleAt(4) { | |
_ = data.subscribe(obs2) | |
} | |
scheduler.start() | |
let expected = parseEventsAndTimes(timeline: "-X-#-----Y|", values: { "\($0)" }) | |
print("obs1", obs1.events) | |
print("obs2", obs2.events) | |
XCTAssertEqual(obs1.events, expected[0]) | |
XCTAssertEqual(obs2.events, expected[1]) | |
} | |
func test4() { | |
let scheduler = TestScheduler(initialClock: 0) | |
let source = scheduler.createObservable(timeline: "-A----BC|-Y|") | |
let obs1 = scheduler.createObserver(String.self) | |
let obs2 = scheduler.createObserver(String.self) | |
let data = source | |
.sharedCache(keepTime: .seconds(3), scheduler: scheduler) | |
_ = data | |
.subscribe(obs1) | |
scheduler.scheduleAt(2) { | |
_ = data.subscribe(obs2) | |
} | |
scheduler.start() | |
let expected = parseEventsAndTimes(timeline: "-A----BC|--A---BC|", values: { "\($0)" }) | |
XCTAssertEqual(obs1.events, expected[0]) | |
XCTAssertEqual(obs2.events, expected[1]) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment