Skip to content

Instantly share code, notes, and snippets.

@wmakeev
Last active March 28, 2023 11:58
Show Gist options
  • Select an option

  • Save wmakeev/74e2c0fb6d888a643d999899e9c86ac8 to your computer and use it in GitHub Desktop.

Select an option

Save wmakeev/74e2c0fb6d888a643d999899e9c86ac8 to your computer and use it in GitHub Desktop.
[$mol bach loader adaptation (concept)] #mol #batch
namespace $ {
/** Элемент очереди на массовую загрузку */
type Queue_item = {
id: string
resolve: ( value: unknown ) => void
reject: ( reason?: any ) => void
signal: AbortSignal
}
export class $bbg_batch_load extends $mol_object2 {
/**
* Максимальное кол-во элементов в очереди при котором срабатывает
* массовая загрузка
*/
static batch_size_max() {
return 100
}
/**
* Время (мс) которое выделяется на сбор элементов для массового
* запроса с момента помещеня первого элмента в очередь
*/
static batch_collect_timeout() {
return 100
}
/** Внутренний таймер */
private static timer: NodeJS.Timer | null = null
/** Очередь элементов для массовой загрузки */
private static queue: Queue_item[] = []
/** Очистка текущего таймера */
private static clear_timer() {
if( this.timer ) {
clearTimeout( this.timer )
this.timer = null
}
}
/** Сброс таймера */
private static reset_timer() {
this.clear_timer()
this.timer = setTimeout(
() => this.flush_request(),
this.batch_collect_timeout()
)
}
/**
* Реальный размер очереди по уникальным идетификаторам (идентификаторы
* могут дублироваться в очереди)
*/
private static real_queue_size() {
return new Set( this.queue.map( it => it.id ) ).size
}
private static flush_queue() {
const items = [ ...this.queue ]
this.queue = []
return items
}
/**
* Инициирует загрузку всех собранных элементов и очищает очередь.
*/
static flush_request() {
this.clear_timer()
const batch_items = this.flush_queue()
const controller = new AbortController()
let done = false
const loading_items = batch_items.filter( it => !it.signal.aborted )
let response_promise
let loading_items_count = loading_items.length
if( loading_items_count > 0 ) {
// Отменяем запрос в том случае, если отменена загрузка всех входящих
// в него элементов
for( const it of loading_items ) {
it.signal.addEventListener( 'abort', ev => {
if( --loading_items_count <= 0 && !controller.signal.aborted ) {
controller.abort()
}
} )
}
const loading_items_ids = new Set( loading_items.map( it => it.id ) )
const transport = $mol_wire_async( $bbg_transport )
// TODO Не понятно как отменять такой запрос :(
response_promise = transport.batch( [ ...loading_items_ids.values() ] )
} else {
// FIXME Срет в консоль "Uncaught (in promise) AbortError: Batch request aborted"
const abort_error = new Error( 'Batch request aborted' )
abort_error.name = 'AbortError'
response_promise = Promise.reject( abort_error )
}
response_promise
.then( data => {
if( controller.signal.aborted ) return
const entities_map = new Map<string, any>()
for( const entities of Object.values( data ) ) {
for( const [ id, entity ] of Object.entries( entities ) ) {
entities_map.set( id, entity )
}
}
for( const item of loading_items ) {
if( item.signal.aborted ) continue
const entity = entities_map.get( item.id )
if( entity ) {
item.resolve( entity )
} else {
item.reject( new Error( `Объект не найден - ${ item.id }` ) )
}
}
} )
.finally( () => {
done = true
} )
return Object.assign( response_promise, {
destructor: () => {
// Abort of done request breaks response parsing
if( !done && !controller.signal.aborted ) controller.abort()
}
} )
}
static load_async( id: string ) {
const controller = new AbortController()
let done = false
const promise = new Promise( ( resolve, reject ) => {
this.queue.push( {
id,
resolve,
reject,
signal: controller.signal
} )
// FIXME Если несколько одинаковых сущностей загружаются
// одновременно, то последующая отменяет загрузку предыдущей.
// Не знаю пока где баг у меня в коде или в атомах.
/* controller.signal.addEventListener( 'abort', ev => {
const abort_error = new Error( 'Загрузка отменена' )
abort_error.name = 'AbortError'
reject( abort_error )
} ) */
if( this.real_queue_size() >= this.batch_size_max() ) {
this.flush_request()
} else {
this.reset_timer()
}
} ).finally( () => {
done = true
} )
return Object.assign( promise, {
destructor: () => {
// Abort of done request breaks response parsing
if( !done && !controller.signal.aborted ) controller.abort()
},
} )
}
@$mol_action
static load( id: string ) {
return $mol_wire_sync( this ).load_async( id ) as any
}
}
}
// Упрощенная версия
namespace $ {
export class $bbg_batch_load2 extends $mol_object2 {
/**
* Время (мс) которое выделяется на сбор элементов для массового
* запроса с момента помещеня первого элмента в очередь
*/
static batch_collect_timeout() {
return 100
}
/**
* Максимальное кол-во элементов в очереди при котором срабатывает
* массовая загрузка
*/
static batch_size_max() {
return 100
}
private static queue = new Set<string>()
private static flush_queue() {
const items = [ ...this.queue.values() ]
this.queue.clear()
return items
}
static async flush_request() {
const batch_ids = this.flush_queue()
if( batch_ids.length === 0 ) return
const data = await $mol_wire_async( this.$.$bbg_transport ).batch( batch_ids )
const entities_map = new Map<string, any>()
for( const entities of Object.values( data ) ) {
for( const [ id, entity ] of Object.entries( entities ) ) {
entities_map.set( id, entity )
}
}
for( const id of batch_ids ) {
this.load(
id,
entities_map.get( id ) ?? new Error( `Объект не найден - ${ id }` )
)
}
}
@$mol_action
static queue_id( id: string ) {
this.queue.add( id )
}
@$mol_mem_key
static load( id: string, next?: any ) {
if( next ) return next
this.queue_id( id )
this.$.$mol_wait_timeout( this.batch_collect_timeout() )
if( this.queue.size === 0 ) {
this.$.$mol_wait_timeout( 10000 )
throw new Error( `Loading timeout - ${ id }` )
} else {
throw this.flush_request()
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment