Last active
May 23, 2022 22:17
-
-
Save marcus-sa/49be090f5d79025795b4dbab016d50e5 to your computer and use it in GitHub Desktop.
Deepkit WebSocket RPC Cloudflare Workers Interconnection
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { createTestingApp } from '@deepkit/framework'; | |
import { ControllerSymbol, rpc } from '@deepkit/rpc'; | |
import { entity } from '@deepkit/type'; | |
import { | |
webSocketFetchRequestHandler, | |
CloudflareWorkerRpcWebSocketClient, | |
} from '../src'; | |
// https://miniflare.dev/testing/jest | |
test('client and server', async () => { | |
const UserController = ControllerSymbol<Controller>('test'); | |
@entity.name('user') | |
class User {} | |
@rpc.controller(UserController) | |
class Controller { | |
@rpc.action() | |
getUser(): User { | |
return new User(); | |
} | |
} | |
const { app } = createTestingApp({ | |
controllers: [Controller], | |
}); | |
const request = new Request('http://localhost/', { | |
headers: { | |
Upgrade: 'websocket', | |
}, | |
}); | |
const response = webSocketFetchRequestHandler({ | |
request, | |
app, | |
}); | |
const client = new CloudflareWorkerRpcWebSocketClient(response); | |
await client.connect(); | |
const controller = client.controller(UserController); | |
await expect(controller.getUser()).resolves.toBeInstanceOf(User); | |
}); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import type { | |
ClientTransportAdapter, | |
TransportConnectionHooks, | |
} from '@deepkit/rpc'; | |
import { RpcClient } from '@deepkit/rpc'; | |
import type { ClassType } from '@deepkit/core'; | |
export type CloudflareWorkerRpcWebSocketClientOptions = string | Response; | |
// https://developers.cloudflare.com/workers/learning/using-websockets#writing-a-websocket-client | |
export class CloudflareWorkerRpcWebSocketClientAdapter | |
implements ClientTransportAdapter | |
{ | |
constructor(public options: CloudflareWorkerRpcWebSocketClientOptions) {} | |
private getURL(): string { | |
if (typeof this.options === 'string') return this.options; | |
return this.options.url; | |
} | |
private async getWebSocketClient(): Promise<WebSocket> { | |
let response: CloudflareWorkerRpcWebSocketClientOptions = this.options; | |
if (typeof this.options === 'string') { | |
response = await fetch(this.options, { | |
headers: { | |
Upgrade: 'websocket', | |
}, | |
}); | |
} | |
// If the WebSocket handshake completed successfully, then the | |
// response has a `webSocket` property. | |
// @ts-expect-error | |
const ws = (response as Response).webSocket; | |
if (!ws) { | |
throw new Error("Server didn't accept WebSocket"); | |
} | |
return ws; | |
} | |
async connect(connection: TransportConnectionHooks): Promise<void> { | |
const ws = await this.getWebSocketClient(); | |
// Call accept() to indicate that you'll be handling the socket here | |
// in JavaScript, as opposed to returning it on to a client. | |
// @ts-expect-error | |
ws.accept(); | |
ws.addEventListener('close', () => connection.onClose()); | |
ws.addEventListener('error', (err: any) => connection.onError(err)); | |
ws.addEventListener('message', (event: MessageEvent) => | |
connection.onData(new Uint8Array(event.data)), | |
); | |
// TODO: Figure out whether or not "open" event listener is needed | |
// ws.addEventListener('open', () => { | |
connection.onConnected({ | |
clientAddress: () => this.getURL(), | |
send: (message: Uint8Array) => ws.send(message), | |
close: () => ws.close(), | |
}); | |
// }); | |
} | |
} | |
// Cloudflare Worker <-> Cloudflare Worker | |
export class CloudflareWorkerRpcWebSocketClient extends RpcClient { | |
constructor(options: CloudflareWorkerRpcWebSocketClientOptions) { | |
super(new CloudflareWorkerRpcWebSocketClientAdapter(options)); | |
} | |
static fromCurrentRequest<T extends ClassType<RpcClient>>( | |
this: T, | |
request: Request, | |
baseUrl: string = '', | |
): InstanceType<T> { | |
const url = new URL(request.url); | |
const ws = url.protocol.startsWith('https') ? 'wss' : 'ws'; | |
if (baseUrl.length && baseUrl[0] !== '/') baseUrl = '/' + baseUrl; | |
return new (this as any)(`${ws}://${url.host}${baseUrl}`); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { createRpcConnection } from '@deepkit/framework'; | |
import { FetchRequestHandlerOptions } from './types'; | |
import { RpcKernel } from '@deepkit/rpc'; | |
import { InjectorContext } from '@deepkit/injector'; | |
export function webSocketFetchRequestHandler<M>({ | |
request, | |
app, | |
}: FetchRequestHandlerOptions<M>): Response { | |
if (request.headers.get('Upgrade') != 'websocket') { | |
return new Response('Expected WebSocket', { status: 426 }); | |
} | |
// To accept the WebSocket request, we create a WebSocketPair (which is like a socket pair, | |
// i.e. two WebSockets that talk to each other), we return one end of the pair in the | |
// response, and we operate on the other end. Note that this API is not part of the | |
// Fetch API standard; unfortunately, the Fetch API / Service Workers specs do not define | |
// any way to act as a WebSocket server today. | |
const pair = new WebSocketPair(); | |
const [client, server] = Object.values(pair); | |
// @ts-ignore | |
server.accept(); | |
const rpcKernel = app.get(RpcKernel); | |
const injectorContext = app.get(InjectorContext); | |
const connection = createRpcConnection(injectorContext, rpcKernel, { | |
close: () => server.close(), | |
write: (buffer: Uint8Array) => server.send(buffer), | |
clientAddress: () => { | |
// Get the client's IP address | |
const ip = request.headers.get('CF-Connecting-IP'); | |
if (!ip) throw new Error('No IP address'); | |
return ip; | |
}, | |
}); | |
server.addEventListener('close', () => connection.close()); | |
server.addEventListener('message', (event: MessageEvent) => | |
connection.feed(new Uint8Array(event.data)), | |
); | |
// Now we return the other end of the pair to the client. | |
return new Response(null, { status: 101, webSocket: client }); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import type { App } from '@deepkit/app'; | |
export interface FetchRequestHandlerOptions<M> { | |
readonly app: App<M>; | |
readonly request: Request; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment