Skip to content

Instantly share code, notes, and snippets.

@guest271314
Last active August 2, 2025 20:20
Show Gist options
  • Save guest271314/d330c7cea513f12ef7bf523c56431453 to your computer and use it in GitHub Desktop.
Save guest271314/d330c7cea513f12ef7bf523c56431453 to your computer and use it in GitHub Desktop.
JavaScript runtime agnostic WebSocket server
// deno bundle https://raw.githubusercontent.com/kawanet/sha1-uint8array/main/lib/sha1-uint8array.ts sha1-uint8array-bundle.js
// bun build --minify sha1-uint8array-bundle.js --outfile=sha1-uint8array.min.js
var z=function(t){if(t&&!w[t]&&!w[t.toLowerCase()])throw new Error("Digest method not supported");return new E},p=function(t,e,i,s){if(t===0)return e&i|~e&s;if(t===2)return e&i|e&s|i&s;return e^i^s},B=function(){return new Uint8Array(new Uint16Array([65279]).buffer)[0]===254},y=[1518500249|0,1859775393|0,2400959708|0,3395469782|0],w={sha1:1};class E{A=1732584193|0;B=4023233417|0;C=2562383102|0;D=271733878|0;E=3285377520|0;_byte;_word;_size=0;_sp=0;constructor(){if(!u||_>=8000)u=new ArrayBuffer(8000),_=0;this._byte=new Uint8Array(u,_,80),this._word=new Int32Array(u,_,20),_+=80}update(t){if(typeof t==="string")return this._utf8(t);if(t==null)throw new TypeError("Invalid type: "+typeof t);const{byteOffset:e,byteLength:i}=t;let s=i/64|0,r=0;if(s&&!(e&3)&&!(this._size%64)){const h=new Int32Array(t.buffer,e,s*16);while(s--)this._int32(h,r>>2),r+=64;this._size+=r}if(t.BYTES_PER_ELEMENT!==1&&t.buffer){const h=new Uint8Array(t.buffer,e+r,i-r);return this._uint8(h)}if(r===i)return this;return this._uint8(t,r)}_uint8(t,e){const{_byte:i,_word:s}=this,r=t.length;e=e|0;while(e<r){const f=this._size%64;let h=f;while(e<r&&h<64)i[h++]=t[e++];if(h>=64)this._int32(s);this._size+=h-f}return this}_utf8(t){const{_byte:e,_word:i}=this,s=t.length;let r=this._sp;for(let f=0;f<s;){const h=this._size%64;let n=h;while(f<s&&n<64){let o=t.charCodeAt(f++)|0;if(o<128)e[n++]=o;else if(o<2048)e[n++]=192|o>>>6,e[n++]=128|o&63;else if(o<55296||o>57343)e[n++]=224|o>>>12,e[n++]=128|o>>>6&63,e[n++]=128|o&63;else if(r)o=((r&1023)<<10)+(o&1023)+65536,e[n++]=240|o>>>18,e[n++]=128|o>>>12&63,e[n++]=128|o>>>6&63,e[n++]=128|o&63,r=0;else r=o}if(n>=64)this._int32(i),i[0]=i[16];this._size+=n-h}return this._sp=r,this}_int32(t,e){let{A:i,B:s,C:r,D:f,E:h}=this,n=0;e=e|0;while(n<16)c[n++]=x(t[e++]);for(n=16;n<80;n++)c[n]=a(c[n-3]^c[n-8]^c[n-14]^c[n-16]);for(n=0;n<80;n++){const o=n/20|0,b=A(i)+p(o,s,r,f)+h+c[n]+y[o]|0;h=f,f=r,r=g(s),s=i,i=b}this.A=i+this.A|0,this.B=s+this.B|0,this.C=r+this.C|0,this.D=f+this.D|0,this.E=h+this.E|0}digest(t){const{_byte:e,_word:i}=this;let s=this._size%64|0;e[s++]=128;while(s&3)e[s++]=0;if(s>>=2,s>14){while(s<16)i[s++]=0;s=0,this._int32(i)}while(s<16)i[s++]=0;const r=this._size*8,f=(r&4294967295)>>>0,h=(r-f)/4294967296;if(h)i[14]=x(h);if(f)i[15]=x(f);return this._int32(i),t==="hex"?this._hex():this._bin()}_hex(){const{A:t,B:e,C:i,D:s,E:r}=this;return l(t)+l(e)+l(i)+l(s)+l(r)}_bin(){const{A:t,B:e,C:i,D:s,E:r,_byte:f,_word:h}=this;return h[0]=x(t),h[1]=x(e),h[2]=x(i),h[3]=x(s),h[4]=x(r),f.slice(0,20)}}var c=new Int32Array(80),u,_=0,l=(t)=>(t+4294967296).toString(16).substr(-8),d=(t)=>t<<24&4278190080|t<<8&16711680|t>>8&65280|t>>24&255,F=(t)=>t,x=B()?F:d,a=(t)=>t<<1|t>>>31,A=(t)=>t<<5|t>>>27,g=(t)=>t<<30|t>>>2;export{z as createHash};
// JavaScript runtime agnostic WebSocket server
//
// Fork of https://gist.github.com/d0ruk/3921918937e234988dfaccfdee781bd3
//
// The Definitive Guide to HTML5 WebSocket by Vanessa Wang, Frank Salim, and Peter Moskovits
// p. 51, Building a Simple WebSocket Server
//
// guest271314 2025
// Do What the Fuck You Want to Public License WTFPLv2 http://www.wtfpl.net/about/
class WebSocketConnection {
readable;
writable;
writer;
incomingStream = new ReadableStream({
start: (_) => {
return this.incomingStreamController = _;
},
});
buffer = new ArrayBuffer(0, { maxByteLength: 1024 ** 2 });
closed = !1;
opcodes = { TEXT: 1, BINARY: 2, PING: 9, PONG: 10, CLOSE: 8 };
constructor(readable, writable) {
this.readable = readable;
if (writable instanceof WritableStreamDefaultWriter) {
this.writer = writable;
} else if (writable instanceof WritableStream) {
this.writable = writable;
this.writer = this.writable.getWriter();
}
}
async processWebSocketStream() {
try {
for await (const frame of this.readable) {
if (!this.closed) {
const { byteLength } = this.buffer;
this.buffer.resize(byteLength + frame.length);
const view = new DataView(this.buffer);
for (let i = 0, j = byteLength; i < frame.length; i++, j++) {
view.setUint8(j, frame.at(i));
}
const processedFrame = await this.processFrame();
if (processedFrame === this.opcodes.CLOSE) {
console.log(processedFrame);
break;
}
} else {
break;
}
}
console.log("WebSocket connection closed.");
} catch (e) {
console.log(navigator.userAgent, e);
console.trace();
}
}
async processFrame() {
let length, maskBytes;
const buffer = new Uint8Array(this.buffer),
view = new DataView(buffer.buffer);
if (buffer.length < 2) {
return !1;
}
let idx = 2,
b1 = view.getUint8(0),
fin = b1 & 128,
opcode = b1 & 15,
b2 = view.getUint8(1),
mask = b2 & 128;
length = b2 & 127;
if (length > 125) {
if (buffer.length < 8) {
return !1;
}
if (length == 126) {
length = view.getUint16(2, !1);
idx += 2;
} else if (length == 127) {
if (view.getUint32(2, !1) != 0) {
await this.close(1009, "");
return this.opcodes.CLOSE;
}
length = view.getUint32(6, !1);
idx += 8;
}
}
if (buffer.length < idx + 4 + length) {
return !1;
}
maskBytes = buffer.subarray(idx, idx + 4);
idx += 4;
let payload = buffer.subarray(idx, idx + length);
payload = this.unmask(maskBytes, payload);
this.incomingStreamController.enqueue({ opcode, payload });
if (this.buffer.byteLength === 0 && this.closed) {
return !0;
}
if (idx + length === 0) {
return !1;
}
for (let i = 0, j = idx + length; j < this.buffer.byteLength; i++, j++) {
view.setUint8(i, view.getUint8(j));
}
this.buffer.resize(this.buffer.byteLength - (idx + length));
return opcode === this.opcodes.CLOSE ? opcode : !0;
}
async send(obj) {
let opcode, payload;
if (obj instanceof Uint8Array) {
opcode = this.opcodes.BINARY;
payload = obj;
} else if (typeof obj == "string") {
opcode = this.opcodes.TEXT;
payload = new TextEncoder().encode(obj);
} else {
throw new Error("Cannot send object. Must be string or Uint8Array");
}
await this.writeFrame(opcode, payload);
}
async writeFrame(opcode, buffer) {
await this.writer.ready;
if (opcode === this.opcodes.TEXT) {
return await this.writer.write(this.encodeMessage(opcode, buffer))
.catch(console.lconsole.logog);
}
if (opcode === this.opcodes.BINARY) {
return await this.writer.write(this.encodeMessage(opcode, buffer))
.catch(console.log);
}
if (opcode === this.opcodes.PING) {
return await this.writer.write(
this.encodeMessage(this.opcodes.PONG, buffer),
)
.catch(console.log);
}
/*
case this.opcodes.PONG:
break;
*/
if (opcode === this.opcodes.CLOSE) {
const view = new DataView(buffer.buffer);
let code, reason;
if (buffer.length >= 2) {
code = view.getUint16(0, !1);
reason = buffer.subarray(2);
}
return await this.close(code, reason)
.then(({ closeCode, reason }) => console.log({ closeCode, reason }));
} else {
return await this.close(1002, "unknown opcode");
}
}
async close(code, reason) {
const opcode = this.opcodes.CLOSE;
let buffer, view;
if (code) {
buffer = new Uint8Array(reason.length + 2);
view = new DataView(buffer.buffer);
view.setUint16(0, code, !1);
buffer.set(reason, 2);
} else {
buffer = new Uint8Array(0);
}
// console.log({ opcode, reason, buffer }, new TextDecoder().decode(reason));
this.incomingStreamController.close();
await this.writer.write(this.encodeMessage(opcode, buffer))
.catch(console.log);
await this.writer.close();
await this.writer.closed;
await Promise.allSettled([
this.readable.cancel(),
]).catch(console.log);
this.buffer.resize(0);
this.closed = !0;
const closeCodes = {
closeCode: view.getUint16(0, !1),
reason: new TextDecoder().decode(reason),
};
if (closeCodes.closeCode === 1000) {
console.log(closeCodes);
}
return closeCodes;
}
unmask(maskBytes2, data) {
let payload = new Uint8Array(data.length);
for (let i = 0; i < data.length; i++) {
payload[i] = maskBytes2[i % 4] ^ data[i];
}
return payload;
}
encodeMessage(opcode, payload) {
// https://codereview.stackexchange.com/a/297758/47730
let buffer, b1 = 128 | opcode, b2 = 0, length = payload.length, index;
const extra = [2, 4, 10];
if (length < 126) {
index = 0;
b2 |= length;
} else if (length < 65536) {
index = 1;
b2 |= 126;
} else {
index = 2;
b2 |= 127;
}
buffer = new Uint8Array(payload.length + extra[index]);
const view = new DataView(buffer.buffer);
view.setUint8(0, b1);
view.setUint8(1, b2);
if (length >= 126 && length < 65536) {
view.setUint16(2, length);
} else if (length >= 65536) {
view.setUint32(2, 0, false);
view.setUint32(6, length, false);
}
buffer.set(payload, extra[index]);
return buffer;
}
static KEY_SUFFIX = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
// https://codereview.stackexchange.com/a/297758/47730
static async hashWebSocketKey(secKeyWebSocket, writable) {
// Use Web Cryptography API crypto.subtle where defined
console.log(secKeyWebSocket, globalThis?.crypto?.subtle);
const encoder = new TextEncoder();
let key;
if (globalThis?.crypto?.subtle) {
key = btoa(
[
...new Uint8Array(
await crypto.subtle.digest(
"SHA-1",
encoder.encode(
`${secKeyWebSocket}${WebSocketConnection.KEY_SUFFIX}`,
),
),
),
].map((s) => String.fromCodePoint(s)).join(""),
);
} else {
// txiki.js does not support Web Cryptography API crypto.subtle
// Use txiki.js specific tjs:hashing or
// https://raw.githubusercontent.com/kawanet/sha1-uint8array/main/lib/sha1-uint8array.ts
const { createHash } = await import("./sha1-uint8array.min.js");
const hash = createHash("sha1").update(
`${secKeyWebSocket}${WebSocketConnection.KEY_SUFFIX}`,
).digest();
key = btoa(
String.fromCodePoint(...hash),
);
}
const header = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" +
"Upgrade: WebSocket\r\n" +
"Connection: Upgrade\r\n" +
"Sec-Websocket-Accept: " + key + "\r\n\r\n";
return writable instanceof WritableStream
? new Response(header).body.pipeTo(writable, { preventClose: true })
: writable.write(encoder.encode(header));
}
}
export { WebSocketConnection };
@guest271314
Copy link
Author

guest271314 commented Apr 28, 2025

Client test code

// Only aborts *before* the handshake
var abortable = new AbortController();
var {
  signal,
} = abortable;
var wss = new WebSocketStream("ws://127.0.0.1:44818", {
  signal,
});
console.log(wss);

var {
  readable,
  writable,
} = await wss.opened.catch(console.warn);

var connection = wss.closed.then(({ closeCode, reason }) => {
  return `WebSocketStream closed. closeCode: ${closeCode}, reason: ${reason}`
}).catch((e) => {
  return e.message;
});

var writer = writable.getWriter();
var reader = readable.getReader();
var len = 0;
var encoder = new TextEncoder();
var decoder = new TextDecoder();
var data = new Uint8Array(1024 ** 2).fill(97);
for (let i = 0; i < data.length; i += 65536) {
  try {
    await writer.ready;
    await writer.write(data.subarray(i, i + 65536));
    // console.log(writer.desiredSize);
    const {
      value: v,
      done,
    } = await reader.read();
    if (typeof v === "string") {
      console.log(v);
    } else {
      const decoded = decoder.decode(v, {
        stream: true,
      });
      console.log(
        len += v.byteLength,
        v,
        [...decoded].every((s) => s === "a"),
      );
    }
  } catch (e) {
    console.warn(e);
  }
}

console.assert(len === data.buffer.byteLength, [len, data.buffer.byteLength]);
console.log(len, data.buffer.byteLength);
await writer.ready;
await writer.write("Text").then(() => reader.read()).then(console.log).catch(
  console.warn,
);

wss.close({closeCode:1000, reason:"Done streaming"});
reader.releaseLock();
await writer.ready;
await writer.close();
await writer.closed;
writer.releaseLock();


function handleClose(args) {
  return args;
}
await Promise.allSettled([
  reader.closed
    .then(handleClose.bind(null, `readable.locked ${readable.locked}`))
    .catch(handleClose.bind(null, `readable.locked ${readable.locked}`)),
  writer.closed
    .then(handleClose.bind(null, `writable.locked ${writable.locked}`))
    .catch(handleClose.bind(null, `writable.locked ${writable.locked}`)),
  connection,
])
  .then((result) => console.log(result));

var ws = new WebSocket("ws://127.0.0.1:44818");
console.log(ws);
ws.binaryType = "arraybuffer";
ws.addEventListener("open", (e) => {
  console.log(e.type);
  write();
});
ws.addEventListener("close", (e) => {
  console.log(e.type, e.code, e.reason);
});
ws.addEventListener("message", (e) => {
  const v = e.data;
  if (typeof v === "string") {
    console.log(v);
  } else {
    const decoded = decoder.decode(v, {
      stream: true,
    });
    console.log(len += v.byteLength, [...decoded].every((s) => s === "a"));
  }
  if (len === data.buffer.byteLength) {
    console.log(ws.bufferedAmount, ws.readyState);
    ws.close(1000, "Done streaming");
  }
});
ws.addEventListener("error", (e) => {
  console.log(e.type, e.message);
});

var len = 0;
var encoder = new TextEncoder();
var decoder = new TextDecoder();
var data = new Uint8Array(1024 ** 2).fill(97);

function write() {
  for (let i = 0; i < data.length; i += 65536) {
    try {
      console.log(ws.bufferedAmount);
      ws.send(data.subarray(i, i + 65536));
    } catch (e) {
      console.warn(e);
    }
  }
}

@guest271314
Copy link
Author

guest271314 commented Apr 29, 2025

Deno

import { WebSocketConnection } from "./websocket-server.js";

const decoder = new TextDecoder();
const listener = Deno.listen({
  port: 8080,
});
const { hostname, port, transport } = listener.addr;
console.log(
  `Listening on hostname: ${hostname}, port: ${port}, transport: ${transport}`,
);
const abortable = new AbortController();
const {
  signal,
} = abortable;
for await (const conn of listener) {
  try {
    const {
      readable,
      writable,
    } = conn;
    const writer = writable.getWriter();
    await readable.pipeTo(
      new WritableStream({
        start(controller) {
          this.ws = void 0;
          const { readable: wsReadable, writable: wsWritable } =
              new TransformStream(),
            wsWriter = wsWritable.getWriter();
          Object.assign(this, { wsReadable, wsWritable, wsWriter });
        },
        async write(value, controller) {
          const request = decoder.decode(value);
          if (request.includes("Upgrade: websocket")) {
            const [key] = request.match(/(?<=Sec-WebSocket-Key: ).+/i);
            const handshake = await WebSocketConnection.hashWebSocketKey(
              key,
              writer,
            );
            this.ws = new WebSocketConnection(this.wsReadable, writer);
            this.ws.processWebSocketStream().catch((e) => {
              throw e;
            });
            console.log(this.ws);
            if (!this.ws.incomingStream.locked) {
              this.ws.incomingStream.pipeTo(
                new WritableStream({
                  write: async ({ opcode, payload }) => {
                    if (
                      opcode === this.ws.opcodes.CLOSE &&
                      payload.buffer.byteLength === 0
                    ) {
                      console.log(
                        opcode,
                        payload,
                        this.ws.incomingStreamController,
                      );
                      try {
                        return await this.ws.close(
                          1000,
                          payload,
                        );
                      } catch (e) {
                        console.log(e);
                        console.trace();
                      }
                    }
                    await this.ws.writeFrame(opcode, payload);
                  },
                }),
              )
                .then(() => console.log("Stream closed", this.ws))
                .catch((e) => {
                  console.log(e);
                })
                .then(async () => {
                  if (!this.ws.closed) {
                    await Promise.allSettled([
                      this.ws?.writable?.close(),
                      this.ws.writer.close(),
                      this.ws.readable.cancel(),
                      this.ws.close(),
                      tcpWriter.close(),
                      tcpReader.cancel(),
                    ]).catch(console.log);
                  }
                  console.log(`Incoming WebSocketStream closed`, this.ws);
                });
            }
          } else {
            await this.wsWriter.ready;
            await this.wsWriter.write(value);
          }
        },
        close() {
          console.log("Stream closed");
        },
        abort(reason) {
          console.log({
            reason,
          });
        },
      }),
    ).then(() => {
      throw new Error("Stream aborted");
    });
  } catch (e) {
    console.log({
      e,
    });
    break;
  }
}

@guest271314
Copy link
Author

guest271314 commented Apr 29, 2025

Bun

import { WebSocketConnection } from "./websocket-server.js";
const decoder = new TextDecoder();
const server = Bun.listen({
  hostname: "0.0.0.0",
  port: 8080,
  socket: {
    async data(socket, data) {
      const request = decoder.decode(data);
      if (request.includes("Upgrade: websocket")) {
        const [key] = request.match(/(?<=Sec-WebSocket-Key: ).+/i);
        const handshake = await WebSocketConnection.hashWebSocketKey(
          key,
          socket,
        );
        const writable = new WritableStream({
          write(value) {
            socket.write(value);
          },
          close() {
            socket.end();
          },
        });
        this.ws = new WebSocketConnection(this.wsReadable, writable);
        this.ws.processWebSocketStream().catch((e) => {
          throw e;
        });
        console.log(this.ws);
        if (!this.ws.incomingStream.locked) {
          this.ws.incomingStream.pipeTo(
            new WritableStream({
              write: async ({ opcode, payload }) => {
                if (
                  opcode === this.ws.opcodes.CLOSE &&
                  payload.buffer.byteLength === 0
                ) {
                  console.log(
                    opcode,
                    payload,
                    this.ws.incomingStreamController,
                  );
                  try {
                    return await this.ws.close(
                      1000,
                      payload,
                    );
                  } catch (e) {
                    console.log(e);
                    console.trace();
                  }
                }
                await this.ws.writeFrame(opcode, payload);
              },
            }),
          )
            .then(() => console.log("Stream closed", this.ws))
            .catch((e) => {
              console.log(e);
            })
            .then(async () => {
              if (!this.ws.closed) {
                await Promise.allSettled([
                  this.ws?.writable?.close(),
                  this.ws.writer.close(),
                  this.ws.readable.cancel(),
                  this.ws.close(),
                ]).catch(console.log);
              }
              console.log(`Incoming WebSocketStream closed`, this.ws);
            });
        }
      } else {
        await this.wsWriter.ready;
        await this.wsWriter.write(data);
      }
    },
    open(socket) {
      console.log("open");
      this.ws = void 0;
      const { readable: wsReadable, writable: wsWritable } =
          new TransformStream(),
        wsWriter = wsWritable.getWriter();
      Object.assign(this, { wsReadable, wsWritable, wsWriter });
    },
    close(socket) {
      console.log("Socket closed");
    },
    drain(socket) {
      console.log("drain");
    },
    error(socket, error) {
      console.log(error);
    },
  },
});

const { hostname, port } = server;

console.log(
  `Listening on hostname: ${hostname}, port: ${port}`,
);

@guest271314
Copy link
Author

guest271314 commented May 1, 2025

Node.js

import { createServer } from "node:http";
import { Duplex } from "node:stream";
import { WebSocketConnection } from "./websocket-server.js";

function handlerequestuest(request, response) {
  console.log("HTTP server got requestuest");
  response.setHeader("Access-Control-Allow-Headers", request.header.origin);
  response.setHeader("Access-Control-Allow-Origin", "*");
  response.setHeader("Access-Control-requestuest-Method", "*");
  response.setHeader("Access-Control-Allow-Methods", "OPTIONS, GET");
  response.setHeader("Access-Control-Allow-Headers", "*");
}
async function handleUpgrade(request, socket, upgradeHead) {
  console.log("HTTP server got UPGRADE");
  try {
    const { readable, writable } = Duplex.toWeb(socket);
    const writer = writable.getWriter();
    await WebSocketConnection.hashWebSocketKey(
      request.headers["sec-websocket-key"],
      writer,
    );
    const ws = new WebSocketConnection(readable, writer);
    ws.processWebSocketStream().catch(
      (e) => {
        throw e;
      },
    );
    console.log(ws);

    ws.incomingStream.pipeTo(
      new WritableStream({
        write: async ({ opcode, payload }) => {
          if (
            opcode === ws.opcodes.CLOSE &&
            payload.buffer.byteLength === 0
          ) {
            console.log(
              opcode,
              payload,
              ws.incomingStreamController,
            );
            try {
              return await ws.close(
                1000,
                payload,
              );
            } catch (e) {
              console.log(e);
              console.trace();
            }
          }
          await ws.writeFrame(opcode, payload);
        },
      }),
    )
      .then(() => console.log("Stream closed", ws))
      .catch((e) => {
        console.log(e);        
      }).then(async () => {
        if (!ws.closed) {
          await Promise.allSettled([
            ws?.writable?.close(),
            ws.writer.close(),
            ws.readable.cancel(),
            ws.close(),
          ]).catch(console.log);
        }
       console.log(`Incoming WebSocketStream closed`, ws);
      });
  } catch (e) {
    console.log(e);
  }
}

const port = 8001;
const host = "0.0.0.0";
const server = createServer(handlerequestuest);
server.on("upgrade", handleUpgrade);
server.listen({ port, host });
server.on(
  "listening",
  () => console.log(`WebSocket server listening on ${host}:${port}`),
);

@guest271314
Copy link
Author

guest271314 commented May 1, 2025

txiki.js

import { WebSocketConnection } from "./websocket-server.js";

const decoder = new TextDecoder();

async function handleConnection(conn) {
  const writer = conn.writable.getWriter();
  const { readable: wsReadable, writable: wsWritable } = new TransformStream(
      {},
      {},
      {
        highWaterMark: 1,
      },
    ),
    wsWriter = wsWritable.getWriter();
  let ws;
  for await (const value of conn.readable) {
    console.log(value);
    const request = decoder.decode(value);
    if (/upgrade: websocket/i.test(request)) {
      const [key] = request.match(/(?<=Sec-WebSocket-Key: ).+/i);
      const handshake = await WebSocketConnection.hashWebSocketKey(
        key,
        writer,
      );
      ws = new WebSocketConnection(wsReadable, writer);
      ws.processWebSocketStream().catch((e) => {
        throw e;
      });
      console.log(ws);
      if (!ws.incomingStream.locked) {
        ws.incomingStream.pipeTo(
          new WritableStream({
            write: async ({ opcode, payload }) => {
              if (
                opcode === ws.opcodes.CLOSE &&
                payload.buffer.byteLength === 0
              ) {
                console.log(
                  opcode,
                  payload,
                  ws.incomingStreamController,
                );
                try {
                  return await ws.close(
                    1000,
                    payload,
                  );
                } catch (e) {
                  console.log(e);
                  console.trace();
                }
              }
              await ws.writeFrame(opcode, payload);
            },
          }),
        )
          .then(() => console.log("Stream closed", ws))
          .catch((e) => {
            console.log(e);
          })
          .then(async () => {
            if (!ws.closed) {
              await Promise.allSettled([
                ws?.writable?.close(),
                ws.writer.close(),
                ws.readable.cancel(),
                ws.close(),
              ]).catch(console.log);
            }
            console.log(`Incoming WebSocketStream closed`, ws);
          });
      }
    } else {
      await wsWriter.ready;
      await wsWriter.write(new Uint8Array(value));
    }
  }

  console.log("WebSocket client connection closed");
  await wsWriter.close();
}
const listener = await tjs.listen("tcp", "0.0.0.0", "8080");
const { family, ip, port } = listener.localAddress;
console.log(
  `${navigator.userAgent} WebSocket server listening on family: ${family}, ip: ${ip}, port: ${port}`,
);

for await (const conn of listener) {
  try {
    console.log({ conn });
    handleConnection(conn).catch((e) => {
      console.log({ e });
    });
  } catch (e) {
    listener.close();
    console.log(e);
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment