Last active
March 29, 2024 08:14
-
-
Save zerkalica/29807d3385d9927e81d4084abf760a68 to your computer and use it in GitHub Desktop.
web-socket
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace $ { | |
const rec = $mol_data_record | |
const cnst = $mol_data_const | |
const str = $mol_data_string | |
const bool = $mol_data_boolean | |
const opt = $mol_data_optional | |
const nul = $mol_data_nullable | |
export const $gd_kit_object_ws_ins_data = rec({ | |
...$gd_kit_prop_data.config, | |
t: opt(str), | |
g: opt(str), | |
is_end: opt(nul(bool)), | |
}) | |
export const $gd_kit_object_ws_hide_data = rec({ | |
p: opt(str), | |
o: opt(str), | |
}) | |
export const $gd_kit_object_ws_message = $mol_data_variant( | |
rec({ | |
type: cnst('pg-cmd-ins' as const), | |
data: $gd_kit_object_ws_ins_data | |
}), | |
rec({ | |
type: cnst('pg-cmd-upd' as const), | |
data: $gd_kit_object_ws_ins_data | |
}), | |
rec({ | |
type: cnst('pg-cmd-hide' as const), | |
data: $gd_kit_object_ws_hide_data | |
}), | |
rec({ | |
type: cnst('error' as const), | |
data: str, | |
}), | |
cnst('pong' as const) | |
) | |
export type $gd_kit_object_ws_command = | |
| { | |
type: 'auth' | |
data: { | |
token: string | |
db: string | |
} | |
} | 'ping' | |
export class $gd_kit_object_ws extends $gd_core_ws<$gd_kit_object_ws_command> { | |
override on_data( data: unknown ) { | |
if (data === 'pong') return this.watchdog(null) | |
return super.on_data(data) | |
} | |
id_prefix() { | |
return '' | |
} | |
protected id_create() { | |
return this.id_prefix() + '#' + (super.id_create() ?? 'unk') | |
} | |
protected tenant_id() { | |
return this.$.$gd_kit_init_main.tenant_id() | |
} | |
override on_object( raw: Object ) { | |
let obj: typeof $gd_kit_object_ws_message.Value | undefined | |
try { | |
obj = $gd_kit_object_ws_message(raw as any) | |
} catch (e) { | |
if ($mol_promise_like(e)) $mol_fail_hidden(e) | |
return // ignore unknown message | |
} | |
if (! obj || typeof obj === 'string') return | |
if (obj.type === 'pg-cmd-ins') return this.pg_cmd_ins({ ...obj.data, t: obj.data?.t ?? this.tenant_id(), }) | |
if (obj.type === 'pg-cmd-upd') return this.pg_cmd_upd({ t: this.tenant_id(), ...obj.data }) | |
if (obj.type === 'pg-cmd-hide') return this.pg_cmd_hide(obj.data) | |
if (obj.type === 'error') { | |
this.error({ val: $mol_wire_sync($mol_error_mix).make(obj.data, { cause: obj }) }) | |
if (obj.data === 'auth timeout') this.watchdog(obj.data) | |
} | |
} | |
ping_send() { | |
this.send('ping', true) | |
} | |
pg_cmd_ins(data: typeof $gd_kit_prop_data.Value) {} | |
pg_cmd_upd(data: typeof $gd_kit_prop_data.Value) {} | |
pg_cmd_hide(data: typeof $gd_kit_object_ws_hide_data.Value) {} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace $ { | |
export class $gd_core_ws<Send = Object> extends $mol_object { | |
watchdog_deadline() { | |
return 30000 | |
} | |
restart_delay() { | |
return 2000 | |
} | |
ping_interval() { | |
return 3000 | |
} | |
url() { | |
return '' | |
} | |
sleeping_default() { return false } | |
@ $mol_mem | |
sleeping(next?: null) { | |
const actual = this.$.$gd_kit_browser_live.hidden() | |
const hidden = actual || this.sleeping_default() | |
if (next === undefined) { | |
this.$.$mol_log3_rise({ | |
place: '$gd_core_ws.hidden()', | |
socket_id: this.id(), | |
message: hidden ? 'hidden' : 'visible', | |
}) | |
} | |
if (hidden) { | |
if (! this.closed_at) this.closed_at = new Date() | |
} | |
return hidden | |
} | |
id() { | |
return $mol_wire_probe(() => this.ws())?.socket_id | |
} | |
protected id_create() { | |
return $mol_guid() | |
} | |
restart(reason = 'User') { | |
this.ws(reason) | |
} | |
@ $mol_mem | |
protected ws(reason?: string) { | |
this.sleeping() | |
if (reason) { | |
this.$.$mol_log3_rise({ | |
place: '$gd_core_ws.ws()', | |
socket_id: this.id(), | |
message: `restart by ${reason}`, | |
}) | |
} | |
const socket_id = this.id_create() | |
let destructing = false | |
let ws | |
let url | |
try { | |
ws = new WebSocket( url = this.url() ) | |
} catch (e) { | |
if ($mol_promise_like(e) || e instanceof $mol_error_mix) $mol_fail_hidden(e) | |
$mol_fail_hidden($mol_wire_sync($mol_error_mix).make( | |
'Не могу создать сокет ', | |
{ | |
url, | |
socket_id, | |
reason | |
}, | |
e as Error | |
)) | |
} | |
const socket = Object.assign( | |
ws, | |
{ | |
socket_id, | |
destructor: () => { | |
if (destructing) return | |
destructing = true | |
this.$.$mol_log3_rise({ | |
place: '$gd_core_ws.ws()#destructor', | |
socket_id: socket_id, | |
message: 'destruct', | |
}) | |
socket.close() | |
} | |
} | |
) | |
socket.onerror = socket.onclose = $mol_wire_async( | |
(e: Event | CloseEvent) => destructing || this.on_close(e) | |
) | |
socket.onmessage = $mol_wire_async((message: MessageEvent) => this.on_message(message)) | |
socket.onopen = $mol_wire_async(() => this.on_open()) | |
this.$.$mol_log3_rise({ | |
place: '$gd_core_ws.ws()', | |
socket_id: socket_id, | |
message: 'init', | |
}) | |
this.error(null) | |
return socket | |
} | |
protected sends = [] as (string | ArrayBufferLike | Blob | ArrayBufferView)[] | |
protected on_open() { | |
this.error(null) | |
this.sleeping(null) | |
this.sends_flush() | |
} | |
protected sends_flush() { | |
const ws = this.ws() | |
while (ws.readyState === WebSocket.OPEN && this.sends.length) { | |
const blob = this.sends[0] | |
ws.send(blob) | |
this.sends.shift() | |
} | |
} | |
@ $mol_action | |
send(data: Parameters<ReturnType<typeof this.ws>['send']>[0], skip_closed = false) { | |
const ws = this.ws() | |
if (ws.readyState === WebSocket.OPEN) ws.send(data) | |
else if (! skip_closed) this.sends.push(data) | |
} | |
send_object(data: Send) { | |
return this.send(JSON.stringify(data)) | |
} | |
@ $mol_mem | |
static ping_off(next?: boolean | null) { | |
return Boolean(this.$.$mol_state_arg.value( | |
'gd_core_ws_ping_off', | |
next === undefined ? undefined : (next ? '' : null) | |
)) | |
} | |
protected closed_at = undefined as undefined | Date | |
@ $mol_action | |
replay_from(from_date: Date) {} | |
@ $mol_mem | |
protected ping_off() { | |
return this.$.$gd_core_ws.ping_off() | |
} | |
on_message(e: MessageEvent) { | |
const task = e.data | |
try { | |
this.on_data(task) | |
} catch (e) { | |
if( $mol_promise_like( e ) ) $mol_fail_hidden( e ) | |
const val = $mol_wire_sync($mol_error_mix).make('Can\'t parse message', { task }, e as Error) | |
this.error({ val }) | |
} | |
} | |
on_data(data: unknown) { | |
this.on_object(typeof data === 'string' ? JSON.parse(data) : undefined) | |
} | |
on_object(e: Object) {} | |
restartable(event: Event & { code?: number }) { | |
return event.type === 'close' && event.code !== 1000 | |
} | |
protected on_close(event: Event) { | |
if (! this.closed_at) this.closed_at = new Date() | |
const prev = this.error()?.val | |
const prev_event = prev instanceof $gd_core_ws_error ? prev.cause : undefined | |
const delay = this.restart_delay() | |
const restartable = delay && (! prev_event || this.restartable(prev_event)) && this.restartable(event) | |
const err = new $gd_core_ws_error(event) | |
this.$.$mol_log3_rise({ | |
place: '$gd_core_ws.on_close()', | |
socket_id: this.id(), | |
message: err?.cause?.type === 'error' ? 'error' : 'close', | |
restartable: restartable ? 'connecting' : 'sleep', | |
}) | |
this.error({ val: err }) | |
if (restartable) this.watchdog('event ' + ( event.type || 'unknown' ), delay) | |
} | |
@ $mol_action | |
ping_send() { | |
const buf = new Uint8Array([ 0x9 ]) | |
this.send(buf) | |
} | |
@ $mol_mem | |
protected heartbeat(next?: null): $mol_after_timeout | undefined { | |
if (! this.ping_interval() || this.ping_off() || this.sleeping()) return | |
this.ws() | |
this.ping_send() | |
return new this.$.$mol_after_timeout( | |
this.ping_interval(), | |
$mol_wire_async(() => this.heartbeat(null)) | |
) | |
} | |
@ $mol_mem | |
protected watchdog(reason?: null | string, timeout = this.watchdog_deadline()) { | |
if (! timeout || this.ping_off() || this.sleeping()) return | |
this.ws() | |
return Object.assign(new this.$.$mol_after_timeout( | |
timeout, | |
$mol_wire_async(() => this.ws(reason ?? 'watchdog timeout')) | |
), { reason }) | |
} | |
@ $mol_mem | |
error(next?: null | { val: Error }) { | |
if (next?.val) { | |
this.$.$mol_fail_log(next.val) | |
} | |
return next ?? null | |
} | |
init() {} | |
@ $mol_mem | |
protected init_once() { | |
this.ws() | |
this.init() | |
} | |
restart_reason() { | |
return this.watchdog()?.reason | |
} | |
@ $mol_mem | |
status(): 'error' | 'open' | 'connecting' | 'sleep' { | |
if (this.sleeping()) return 'sleep' | |
let state: number = WebSocket.CONNECTING | |
try { | |
state = this.ws().readyState | |
} catch (e) { | |
this.$.$mol_fail_log(e) | |
if (e instanceof Error) { | |
this.error({ val: e }) | |
return 'error' | |
} | |
return 'connecting' | |
} | |
if (state === WebSocket.OPEN) this.init_once() | |
this.heartbeat() | |
const reason = this.watchdog()?.reason | |
if (this.closed_at) { | |
this.replay_from(this.closed_at) | |
this.$.$mol_log3_rise({ | |
place: '$gd_core_ws.status()', | |
socket_id: this.id(), | |
message: 'replay from', | |
closed_at: this.closed_at.toISOString(), | |
}) | |
this.closed_at = undefined | |
} | |
if (this.error()) return 'error' | |
if (reason) return 'connecting' | |
if (state === WebSocket.CONNECTING) return 'connecting' | |
if (state === WebSocket.CLOSING) return 'sleep' | |
if (state === WebSocket.CLOSED) return 'sleep' | |
return 'open' | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment