完成形
https://github.com/napolab/cloudflare-workers-playground
メリデメはこんな感じ。firestore を多用する理由はデメリットが大きい反面、圧倒的なスピードを得られるから。(refetch を websocket にすべて任せられるのは強いがよい
- firestore の realtime 性はとても強力
- 大体のケースにおいて web api server を書く必要がない
- 逆に書くと遅くなるので client side sdk で完結させるように書くことが多い
- NoSQL なので変更に弱い
- rules がマジできつい
- client side sdk で DB を変更するのでセキュリティを rules という独自言語で担保するのだが、これが型がないうえに複雑な状態が持てないので roles による可視性制御がめちゃくちゃ大変(できることにはできる)
- web api server を書こうと思ったときに firebase cloudfunction という手段をとることになるが、めちゃくちゃデプロイが遅くて気がくるってしまう
- ほしい機能
- 永続化層がある
- DB にスキーマがある
- RDBMS が使える
- web api server が書ける
- websocket による変更のリアルタイム通知ができる
- デプロイが高速である
それぞれ workers にある機能に割り当てる
- 永続化層 --> D1
- DB にスキーマがある --> drizzle によるスキーマファースト開発
- web api server が書ける --> cloudflare workers そのもの
- websocket による変更のリアルタイム通知 --> DurableObjects +
WebSocketPiar
,Response
で達成可能 - デプロイが高速である --> cloudflare workers は v8 エンジンを積んでるので js を直でデプロイできることから高速である(コンテナ技術を使用していない)
いけそう
websocket で client side とコネクションを張りっぱなしにしておくためには WebSocketPair
を接続が破棄される or 接続を破棄するまでは保持しておかないといけない。
しかし、普通にサーバーレスでは in-memory で状態を持っていても次々に別のプロセスでサーバーが起動するので同じ in-memory を参照できないので connection を保持することができない。
そこで登場するのが DurableObjects という機能。
workers が stateless な function だとすると DurableObjects はa singleton の class instance とみることができる。(実際に class で記述するし....
https://blog.cloudflare.com/ja-jp/durable-objects-ga-ja-jp/
アプリケーションがそのクラスの名前付きインスタンス(Cloudflareネットワーク全体を通して必ずユニークなものになります)を作成できます。そのインスタンスが1つのDurable Objectであり、Workers(と他のDurable Objects)はもとのDurable Objectに対し、ID経由でメッセージを送ることができます。Durable Objectは送られてきたメッセージを順番にシングルスレッドで処理し、メッセージ間の調整を行います。
DurableObjects の強みは強整合であること、トランザクション系に強そうだなと思っている。DB を使わずに状態を表現できるので counter を簡単に作ることができる。
singleton とみることができるといったがブラウザの window みたいなところに生えるわけではなく、fetch
を返して network 越しに singleton instance の method をたたくという I/F になっている。
https://zenn.dev/mizchi/articles/5130b02c5b490e4f871a
つまり Workers KV は結果整合、 Durable Objects は強整合とのこと。地理的に離れてる場合はどうなるんだろう?と思ったんですが、次のような記述があります。
適当な Counter を作るならこう(下にコードを用意した)。動かしたかったらこちらで
- Hono と drizzle で CRUD を作る
- drizzle-orm + d1 で table と client を作る
- midlleware で認証入れたり cache する
- subscription api を作って websocket で変更を通知する
- 変更通知対象は DurableObjects で保持する
import { zValidator } from "@hono/zod-validator";
import { drizzle } from "drizzle-orm/d1";
import { Hono } from "hono";
import { cors } from "hono/cors";
type Bindings = {
readonly DB: D1Database;
};
type Environment = {
readonly Bindings: Bindings;
};
const app = new Hono<Environment>();
app.use("/api/*", cors());
app.get("/api/posts", async (c) => {
const db = drizzle(c.env.DB);
const data = await db.select().from(posts).all();
return c.json(data);
});
app.post(
"/api/post",
zValidator("json", z.object({ title: z.string(), body: z.string() })),
async (c) => {
const res = c.req.valid("json");
const db = drizzle(c.env.DB);
await db.insert(posts).values({
title: res.title,
body: res.body,
createdAt: new Date(),
}).run();
const result = await db.select().from(posts).all();
return c.json(result);
}
);
export default app
type Bindings = {
readonly DB: D1Database;
+ readonly SHARED_EVENT: DurableObjectNamespace;
};
const sharedEvent = (c: Context<Environment>) => (type: string) => {
const doId = c.env.SHARED_EVENT.idFromName(type);
return c.env.SHARED_EVENT.get(doId);
};
app.get("/subscribe/posts", async (c) => {
const obj = sharedEvent(c)("posts");
const response = await obj.fetch(new URL("/events", c.req.url), {
headers: c.req.headers,
});
return response;
});
WebScoketPair
を保持する DurableObjects を作る
import { Hono } from "hono";
import { wsupgrade } from "../middleware";
export class SharedEvent implements DurableObject {
private readonly app = new Hono();
private readonly sessions = new Set<WebSocket>();
constructor(private readonly state: DurableObjectState) {
this.app.get("/events", wsupgrade(), async (c) => {
const pair = new WebSocketPair();
this.handleSession(pair[1]);
return new Response(null, { status: 101, webSocket: pair[0] });
});
this.app.post("/event", async (c) => {
const data = await c.req.json();
const json = JSON.stringify(data);
for (const socket of this.sessions) {
socket.send(json);
}
});
}
private handleSession(socket: WebSocket): void {
socket.accept();
this.sessions.add(socket);
socket.addEventListener("close", () => {
this.sessions.delete(socket);
socket.close();
});
}
fetch(request: Request) {
return this.app.fetch(request);
}
}
あとは workers の endpoint に post されたときに SHARED_EVENT
に対して /event
を post することで /subscribe/xxx
から client side に websocket で通知が送られる。
- websocket の connection を何個同時接続できるのか実際にリクエストしてみた結果。7000個くらいが限界っぽい。詳しくは調べてはない。
/* eslint-disable import/no-extraneous-dependencies */
/* eslint-disable no-console */
import { client as WebSocket } from "websocket";
import type { connection as Connection } from "websocket";
const main = async () => {
const waitlist = new Set<string>();
const connections = new Set<Connection>()
process.on("SIGINT", () => {
for(const connection of connections) {
connection.close()
console.log(connection.state)
}
process.exit(0)
})
const receiveCountMap = new Map<string, number>();
{
const id = setInterval(() => {
console.log("waitlist", waitlist.size);
if (waitlist.size === 0) {
clearInterval(id);
}
}, 5000);
}
{
const id = setInterval(() => {
if(receiveCountMap.size === 0) return;
console.log("size", receiveCountMap.size);
}, 5000);
}
new Array(10000).fill(null).forEach(async (_, idx) => {
const key = `client[${idx}]`;
waitlist.add(key);
const client = new WebSocket();
client.connect("wss://<worker-url>/subscribe/posts");
// client が connected になるまで待機
const connection = await new Promise<Connection>((resolve, reject) => {
client.on("connect", (con) => resolve(con));
// client.on("connectFailed", (error) => reject(error));
});
connections.add(connection);
waitlist.delete(key);
connection.on("message", (message) => {
if (message.type === "utf8") {
const count = receiveCountMap.get(key) ?? 0;
receiveCountMap.set(key, count + 1);
}
});
connection.on("error", (error) => {
console.error(`error[${key}]`, error);
});
connection.on("close", (close) => {
console.error(`close[${key}]`, close);
});
});
};
void main()
実装によるかもだけど同時接続 7000 くらいで durable object の fetch が止まってる?
waitlist 9540
waitlist 6741
waitlist 3667
waitlist 2517
waitlist 2487
waitlist 2487
waitlist 2437
waitlist 2435
waitlist 2434
waitlist 2434
waitlist 2434
waitlist 2434
waitlist 2426
waitlist 2425
waitlist 2425
waitlist 2425
10000 件 websocket のコネクション飛ばして残り 2425 個以降進まない
size 7533 帰ってきた message の数は 7533 個だから結構落ちるなぁ
connected すらも行かないのかー