Created
May 23, 2024 09:45
-
-
Save ovcharik/11c151512c1c4dc201a2c5feec617fdb to your computer and use it in GitHub Desktop.
ObservableStorage
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
BehaviorSubject, | |
combineLatest, | |
defaultIfEmpty, | |
filter, | |
iif, | |
map, | |
Observable, | |
observeOn, | |
of, | |
queueScheduler, | |
ReplaySubject, | |
share, | |
shareReplay, | |
startWith, | |
Subject, | |
switchMap, | |
take, | |
} from 'rxjs'; | |
/** | |
* ObservableStorage служит для формирования единого потока изменений из множества взаимно-зависимых | |
* источник. | |
* | |
* Основная проблема при работе с множеством потоков, это их синхронизация. В первую очередь, данная | |
* реализация предназначена для работы с потоками, в которых есть рекурсивные зависимости. | |
* | |
* При помощи DI, ангуляр позволяет, очень гибко настраивать различные связи между компонентами. Но | |
* внедрение зависимостей не позволяет устанавливать отношения между потоками данных в таких связях. | |
* Например, отдельные инструменты для отслеживания потока данных, есть в реактивных формах. И это | |
* достаточно объемный инструментарий, работающих на схожих принципах, что и это хранилище. Основная | |
* проблема в том, что реактивные формы имеют ограниченную специализацию. Данное же решение | |
* полностью полагается на RxJs, что дает большую гибкость в плане вариантов использования. | |
* | |
* В примере ниже показано, как с помощью данного хранилища, организуется система глубокого | |
* отслеживания изменений в древовидной структуре. | |
* | |
* @example | |
* | |
* ```ts | |
* type Node = { title: string; children: Node[] }; | |
* | |
* class NodeBuilder { | |
* protected readonly childrenStorage$ = new ObservableStorage<Node>(); | |
* | |
* protected readonly title$ = new BehaviorSubject<string>(''); | |
* protected readonly children$ = this.childrenStorage$.pipe(startWith<Node[]>([])); | |
* | |
* public readonly node$ = combineLatest([this.title$, this.children$]).pipe( | |
* debounceTime(0), | |
* map(([title, children]) => ({ title, children })) | |
* ); | |
* | |
* constructor(title: string, protected parent?: NodeBuilder) { | |
* this.setTitle(title); | |
* if (this.parent) { | |
* this.parent.attachChild(this.node$); | |
* } | |
* } | |
* | |
* protected attachChild(child: Observable<Node>) { | |
* this.childrenStorage$.attachSource(child, child); | |
* } | |
* | |
* setTitle(title: string) { | |
* this.title$.next(title); | |
* } | |
* } | |
* | |
* const root = new NodeBuilder('root'); | |
* const foo = new NodeBuilder('Foo', root); | |
* const bar = new NodeBuilder('Bar', root); | |
* const baz = new NodeBuilder('Baz', bar); | |
* | |
* foo.setTitle('FOOOOO!!!'); | |
* baz.setTitle('BarBaz'); | |
* | |
* root.node$.subscribe(console.log); | |
* // { "title": "root", "children": [{ "title": "FOOOOO!!!", "children": [] }, { ... }] } | |
* ``` | |
*/ | |
export class ObservableStorage<Data, Key = unknown> extends Observable<Data[]> { | |
private readonly storage = new Map<Key, Observable<Data | void>>(); | |
/** Контроль подключения и отключения источников данных */ | |
private readonly invalidated$ = new Subject<void>(); | |
/** Передает источник данных, который был отключен от хранилища */ | |
private readonly detached$ = new Subject<Observable<Data | void>>(); | |
/** Текущее состояние всех источников данных, упакованное в массив. */ | |
private readonly inputs$ = this.invalidated$.pipe( | |
map(() => [...this.storage.values()]), | |
switchMap((sources) => combineLatest(sources).pipe(defaultIfEmpty([] as Data[]))), | |
map(<T>(results: T[]) => results.filter((result: T): result is Exclude<T, void> => !!result)), | |
shareReplay({ bufferSize: 1, refCount: true }) | |
); | |
/** Промежуточная точка для передачи изменений подписчикам хранилища */ | |
private readonly output$ = new BehaviorSubject<Data[]>([]); | |
/** | |
* Внутренняя подписка, для передачи изменений из `inputs$` в `output$`. | |
* | |
* Также данная подписка устанавливает единую зависимость для внешних подписчиков. Завершение | |
* данной подписки, приведет к завершению всех внешних подписок. | |
*/ | |
private readonly inputsSubscription = this.inputs$.subscribe(this.output$); | |
constructor() { | |
// При помощи конструктора родительского класса Observable, устанавливаются правила подключения | |
// внешних слушателей к внутреннему потоку изменений хранилища | |
super((outputSubscriber) => { | |
const outputSubscription = this.output$.subscribe(outputSubscriber); | |
this.inputsSubscription.add(outputSubscription); | |
return () => outputSubscription.unsubscribe(); | |
}); | |
} | |
/** | |
* Завершение работы хранилища. Хранилище будет отключено от всех источников данных, добавленных в | |
* хранилище. Также будут отключены все внешние подписчики хранилища. При попытке подписаться к | |
* выключенному хранилищу, вернется последнее известное состояние. | |
*/ | |
complete() { | |
this.storage.clear(); | |
this.inputsSubscription.unsubscribe(); | |
this.invalidated$.complete(); | |
this.detached$.complete(); | |
this.output$.complete(); | |
} | |
/** | |
* Подключение источника данных к хранилищу. | |
* | |
* При подключении источника, происходит настройка синхронизации потока изменений источника, с | |
* потоками других источников, добавленных в хранилище. Ключевые моменты здесь - это создание | |
* подписки с многоадресной рассылкой, и контроль за моментом завершения отслеживания | |
* изменений. | |
* | |
* Внешние подписчики продолжат получать последнее значение источника, даже после завершения его | |
* потока изменений. Передача значений прекратится только после отключения источника от хранилища. | |
* | |
* @param key ключ для идентификации источника данных | |
* @param source источник данных | |
*/ | |
attachSource(key: Key, source: Observable<Data>) { | |
const storedSource = this.storage.get(key); | |
if (storedSource) this.detached$.next(storedSource); | |
const waitDetach$: Observable<boolean> = this.detached$.pipe( | |
filter((detached) => detached === sharedSource), | |
map(Boolean), | |
take(1) | |
); | |
const detachNotifier$: Observable<boolean> = iif( | |
() => this.storage.get(key) === sharedSource, | |
waitDetach$, | |
of(true) | |
); | |
const sharedSource = source.pipe( | |
observeOn(queueScheduler), | |
startWith(void 0), | |
share({ | |
connector: () => new ReplaySubject(1), | |
resetOnComplete: () => detachNotifier$, | |
resetOnRefCountZero: () => detachNotifier$, | |
}) | |
); | |
this.storage.set(key, sharedSource); | |
this.invalidated$.next(); | |
} | |
/** | |
* Отключение источника данных от хранилища. | |
* | |
* @param key ключ для идентификации источника данных | |
*/ | |
detachSource(key: Key) { | |
const storedSource = this.storage.get(key); | |
if (!storedSource) return; | |
this.storage.delete(key); | |
this.detached$.next(storedSource); | |
this.invalidated$.next(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment