Last active
February 29, 2024 01:43
-
-
Save garth/9aeac433e83044b0e17d2637d15de179 to your computer and use it in GitHub Desktop.
Connect emitter interface for Fireproof
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
type DownloadMetaFnParams, | |
type DownloadDataFnParams, | |
type UploadMetaFnParams, | |
type UploadDataFnParams, | |
type ChannelEmitter, | |
} from './types' | |
import { Connection } from '@fireproof/connect' | |
export type ConnectEmitterParams = { | |
emitter: ChannelEmitter | |
} | |
export class ConnectEmitter extends Connection { | |
emitter: ChannelEmitter | |
messagePromise: Promise<Uint8Array[]> | |
messageResolve?: (value: Uint8Array[] | PromiseLike<Uint8Array[]>) => void | |
constructor(params: ConnectEmitterParams) { | |
super() | |
this.emitter = params.emitter | |
this.ready = Promise.resolve() | |
this.messagePromise = new Promise<Uint8Array[]>((resolve, reject) => { | |
this.messageResolve = resolve | |
}) | |
} | |
async onConnect() { | |
if (!this.loader || !this.taskManager) { | |
throw new Error('loader and taskManager must be set') | |
} | |
this.emitter.on('update', (message) => { | |
const afn = async () => { | |
const eventBlock = await this.decodeEventBlock(message.data) | |
await this.taskManager!.handleEvent(eventBlock) | |
this.messageResolve?.([eventBlock.value.data.dbMeta]) | |
// add the cid to our parents so we delete it when we send the update | |
this.parents.push(eventBlock.cid) | |
setTimeout(() => { | |
this.messagePromise = new Promise<Uint8Array[]>((resolve, reject) => { | |
this.messageResolve = resolve | |
}) | |
}, 0) | |
} | |
void afn() | |
}) | |
void this.emitter.emit('requestHead') | |
} | |
async dataUpload(data: Uint8Array, params: UploadDataFnParams) { | |
void this.emitter.emit('store', { | |
id: `${params.name}-car-${params.car}`, | |
data, | |
}) | |
} | |
async dataDownload(params: DownloadDataFnParams) { | |
return new Promise<Uint8Array>((resolve) => { | |
const id = `${params.name}-car-${params.car}` | |
const messageHandler = (message: { id: string; data: Uint8Array | null }) => { | |
if (message.id === id) { | |
this.emitter.off('response', messageHandler) | |
if (message.data == null) { | |
throw new Error('Failure in downloading data!') | |
} | |
resolve(message.data) | |
} | |
} | |
this.emitter.on('response', messageHandler) | |
void this.emitter.emit('request', { id }) | |
}) | |
} | |
async metaUpload(bytes: Uint8Array, _params: UploadMetaFnParams) { | |
const event = await this.createEventBlock(bytes) | |
void this.emitter.emit('storeHead', { | |
id: event.cid.toString(), | |
data: event.bytes, | |
parents: this.parents.map((parent) => parent.toString()), | |
}) | |
this.parents = [event.cid] | |
return null | |
} | |
async metaDownload(_params: DownloadMetaFnParams) { | |
return this.messagePromise | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { type Connectable } from '@fireproof/connect' | |
import { ConnectEmitter } from './connect-emitter' | |
import type { ChannelEmitter } from './types' | |
const webSocketCxs = new Map<string, ConnectEmitter>() | |
export const connect = { | |
emitter: ({ name, blockstore }: Connectable, emitter: ChannelEmitter) => { | |
if (!name) { | |
throw new Error('database name is required') | |
} | |
if (webSocketCxs.has(name)) { | |
return webSocketCxs.get(name)! | |
} | |
const connection = new ConnectEmitter({ emitter }) | |
connection.connect(blockstore) | |
webSocketCxs.set(name, connection) | |
return connection | |
}, | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import type Emittery from 'emittery' | |
export type ChannelEmitter = Emittery<{ | |
/** | |
* Store a message for future retrieval | |
* | |
* These messages are not broadcast to subscribers | |
*/ | |
store: { | |
id: string | |
data: Uint8Array | |
} | |
/** | |
* This event is emitted when a message is received | |
*/ | |
update: { | |
id?: string | |
data: Uint8Array | |
} | |
/** | |
* Request a stored message from the channel | |
*/ | |
request: { | |
id: string | |
} | |
/** | |
* Response to a request | |
*/ | |
response: { | |
id: string | |
/** Null if the message was not found */ | |
data: Uint8Array | null | |
} | |
/** | |
* Special message to store fireproof database updates | |
*/ | |
storeHead: { | |
id: string | |
data: Uint8Array | |
parents: string[] | |
} | |
/** | |
* Special message to request fireproof database updates | |
* | |
* The response will be sent as a series of `update` events | |
*/ | |
requestHead: undefined | |
}> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment