Skip to content

Instantly share code, notes, and snippets.

@robertrypula
Created February 11, 2021 20:50
Show Gist options
  • Save robertrypula/b813ffe23a9489bae1b677f1608676c8 to your computer and use it in GitHub Desktop.
Save robertrypula/b813ffe23a9489bae1b677f1608676c8 to your computer and use it in GitHub Desktop.
WebSocket - binary broadcast example (pure NodeJs implementation without any dependency)
// Copyright (c) 2019-2021 Robert Rypuła - https://github.com/robertrypula
/*
+--------------------------------------------------+
| Binary broadcast WebSocket server in pure NodeJs |
+--------------------------------------------------+
Based on great article created by Srushtika Neelakantam:
https://medium.com/hackernoon/implementing-a-websocket-server-with-node-js-d9b78ec5ffa8
Differences to the article:
- supports only binary frames (max payload length < 64 KiB)
- supports HTTP/WS and HTTPS/WSS
- WebSocket frames are parsed even if NodeJs buffer chunks are not aligned
- THEORETICALLY it supports fragmented frames but message is split into smaller messages
(NOT TESTED as I never get fragmented frame from the browser)
- sends received data in the broadcast mode to all connected clients
*/
const crypto = require('crypto');
const { createServer: createServerHttp } = require('http');
const { createServer: createServerHttps } = require('https');
const { networkInterfaces } = require('os');
const { readFileSync } = require('fs');
const IS_HTTPS = false;
const PORT = 6175;
// ---------------------------------------------------------
/*
Self-signed certificate solution:
- https://nodejs.org/en/knowledge/HTTP/servers/how-to-create-a-HTTPS-server/
openssl genrsa -out key.pem
openssl req -new -key key.pem -out csr.pem
openssl x509 -req -days 9999 -in csr.pem -signkey key.pem -out cert.pem
rm csr.pem
Let's Encrypt solution:
- https://advancedweb.hu/how-to-use-lets-encrypt-with-node-js-and-express/
- https://stackoverflow.com/questions/48078083/lets-encrypt-ssl-couldnt-start-by-error-eacces-permission-denied-open-et
*/
const HTTP_426_UPGRADE_REQUIRED = 426;
const requestListener = (localRequest, localResponse) => {
localRequest.on('data', () => undefined);
localRequest.on('end', () => {
console.log('[normal HTTP request]\n');
localResponse.statusCode = HTTP_426_UPGRADE_REQUIRED;
localResponse.setHeader('Upgrade', 'WebSocket');
localResponse.setHeader('Content-Type', 'text/html; charset=UTF-8');
localResponse.end(
'<!DOCTYPE html>\n' +
'<html lang="en">\n' +
' <head><meta charSet="UTF-8"/><title>WebSocket server</title><link rel="icon" href="data:,"></head>\n' +
' <body>This service supports only WebSockets</body>\n' +
'</html>'
);
});
};
const server = IS_HTTPS
? createServerHttps({ key: readFileSync('key.pem'), cert: readFileSync('cert.pem') }, requestListener)
: createServerHttp(requestListener);
server.listen(PORT, '0.0.0.0');
// ---------------------------------------------------------
const getIpv4Addresses = () => {
const nets = networkInterfaces();
const ipv4Addresses = [];
for (const name of Object.keys(nets)) {
for (const net of nets[name]) {
net.family === 'IPv4' && ipv4Addresses.push(net.address);
}
}
return ipv4Addresses;
};
if (server) {
[
'',
'------------------------------------',
' :: WebSocket server :: ',
'------------------------------------',
'',
...getIpv4Addresses().map(ipv4Address => `Waiting on ${IS_HTTPS ? 'wss' : 'ws'}://${ipv4Address}:${PORT}`),
''
].forEach(line => console.log(line));
}
// ---------------------------------------------------------
const debugBuffer = (bufferName, buffer) => {
const length = buffer ? buffer.length : '---';
console.log(`:: DEBUG - ${bufferName} | ${length} | `, buffer, '\n');
};
const getSecWebSocketAccept = acceptKey => {
// WFT is this long GUID? Answer below ;)
// https://stackoverflow.com/questions/13456017
return crypto
.createHash('sha1')
.update(acceptKey + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11', 'binary')
.digest('base64');
};
const getUpgradeResponseHeader = req => {
return [
'HTTP/1.1 101 Web Socket Protocol Handshake',
'Upgrade: websocket',
'Connection: Upgrade',
`Sec-WebSocket-Accept: ${getSecWebSocketAccept(req.headers['sec-websocket-key'])}`,
`Sec-WebSocket-Protocol: audio-network-reborn`
];
};
// ---------------------------------------------------------
let connectedSockets = [];
server.on('upgrade', (req, socket) => {
let bufferToParse = Buffer.alloc(0);
if (req.headers['upgrade'] !== 'websocket') {
socket.end('HTTP/1.1 400 Bad Request');
return;
}
socket.on('data', buffer => {
let parsedBuffer;
bufferToParse = Buffer.concat([bufferToParse, buffer]);
do {
parsedBuffer = getParsedBuffer(bufferToParse);
debugBuffer('buffer', buffer);
debugBuffer('bufferToParse', bufferToParse);
debugBuffer('parsedBuffer.payload', parsedBuffer.payload);
debugBuffer('parsedBuffer.bufferRemainingBytes', parsedBuffer.bufferRemainingBytes);
bufferToParse = parsedBuffer.bufferRemainingBytes;
if (parsedBuffer.payload) {
connectedSockets = connectedSockets.filter(connectedSocket => connectedSocket.readyState === 'open');
console.log(`[sending parsedBuffer.payload to ${connectedSockets.length} active connection(s)]\n`);
connectedSockets.forEach(connectedSocket => connectedSocket.write(createWebSocketFrame(parsedBuffer.payload)));
}
} while (parsedBuffer.payload && parsedBuffer.bufferRemainingBytes.length);
console.log('----------------------------------------------------------------\n');
});
socket.on('close', () => console.log('[socket close]\n'));
socket.on('connect', () => console.log('[socket connect]\n'));
socket.on('drain', () => console.log('[socket drain]\n'));
socket.on('end', () => console.log('[socket end]\n'));
socket.on('ready', () => console.log('[socket ready]\n'));
socket.on('error', () => console.log('[socket error]\n'));
socket.on('timeout', () => console.log('[socket timeout]\n'));
socket.write(getUpgradeResponseHeader(req).join('\r\n') + '\r\n\r\n');
console.log('[new connection]\n');
connectedSockets.push(socket);
});
// ---------------------------------------------------------
/*
https://tools.ietf.org/html/rfc6455#section-5.2
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
OpCode
%x0 denotes a continuation frame
%x1 denotes a text frame
%x2 denotes a binary frame
%x3–7 are reserved for further non-control frames
%x8 denotes a connection close
%x9 denotes a ping
%xA denotes a pong
%xB-F are reserved for further control frames
*/
// ---------------------------------------------------------
const createWebSocketFrame = payload => {
const payloadLengthByteCount = payload.length < 126 ? 0 : 2;
const buffer = Buffer.alloc(2 + payloadLengthByteCount + payload.length);
let payloadOffset = 2;
if (payload.length >= Math.pow(2, 16)) {
throw new Error('Payload equal or bigger than 64 KiB is not supported');
}
buffer.writeUInt8(0b10000010, 0); // FIN flag = 1, opcode = 2 (binary frame)
buffer.writeUInt8(payload.length < 126 ? payload.length : 126, 1);
if (payloadLengthByteCount > 0) {
buffer.writeUInt16BE(payload.length, 2);
payloadOffset += payloadLengthByteCount;
}
payload.copy(buffer, payloadOffset);
return buffer;
};
// ---------------------------------------------------------
const getParsedBuffer = buffer => {
let bufferRemainingBytes;
let currentOffset = 0;
let maskingKey;
let payload;
if (currentOffset + 2 > buffer.length) {
return { payload: null, bufferRemainingBytes: buffer };
}
const firstByte = buffer.readUInt8(currentOffset++);
const secondByte = buffer.readUInt8(currentOffset++);
const isFinalFrame = !!((firstByte >>> 7) & 0x1);
const opCode = firstByte & 0xf;
const isMasked = !!((secondByte >>> 7) & 0x1); // https://security.stackexchange.com/questions/113297
let payloadLength = secondByte & 0x7f;
if (!isFinalFrame) {
console.log('[not final frame detected]\n');
}
if (opCode === 0x8) {
console.log('[connection close frame]\n');
// TODO read payload, for example payload equal to <0x03 0xe9> means 1001:
// 1001 indicates that an endpoint is "going away", such as a server
// going down or a browser having navigated away from a page.
// More info here: https://tools.ietf.org/html/rfc6455#section-7.4
return { payload: null, bufferRemainingBytes: null };
}
if (opCode !== 0x2 && opCode !== 0x0) {
throw new Error('Only binary and continuation frames are supported');
}
if (payloadLength > 125) {
if (payloadLength === 126) {
if (currentOffset + 2 > buffer.length) {
return { payload: null, bufferRemainingBytes: buffer };
}
payloadLength = buffer.readUInt16BE(currentOffset);
currentOffset += 2;
} else {
throw new Error('Payload equal or bigger than 64 KiB is not supported');
}
}
if (isMasked) {
if (currentOffset + 4 > buffer.length) {
return { payload: null, bufferRemainingBytes: buffer };
}
maskingKey = buffer.readUInt32BE(currentOffset);
currentOffset += 4;
}
if (currentOffset + payloadLength > buffer.length) {
console.log('[misalignment between WebSocket frame and NodeJs Buffer]\n');
return { payload: null, bufferRemainingBytes: buffer };
}
payload = Buffer.alloc(payloadLength);
if (isMasked) {
for (let i = 0, j = 0; i < payloadLength; ++i, j = i % 4) {
const shift = j === 3 ? 0 : (3 - j) << 3;
const mask = (shift === 0 ? maskingKey : maskingKey >>> shift) & 0xff;
payload.writeUInt8(mask ^ buffer.readUInt8(currentOffset++), i);
}
} else {
for (let i = 0; i < payloadLength; i++) {
payload.writeUInt8(buffer.readUInt8(currentOffset++), i);
}
}
bufferRemainingBytes = Buffer.alloc(buffer.length - currentOffset);
for (let i = 0; i < bufferRemainingBytes.length; i++) {
bufferRemainingBytes.writeUInt8(buffer.readUInt8(currentOffset++), i);
}
return { payload, bufferRemainingBytes };
};
@guest271314
Copy link

Thanks for this! I've got a sightly modified version of your code working as a WebSocket server in the browser using Direct Sockets TCPServerSocket. The part missing now is how to send a normal close frame from the server to the client. Did you ever figure out how to construct a close frame?

@robertrypula
Copy link
Author

I'm glad you like it :) Unfortunately I never continued more than this code on this topic :/ Can you share your code with TCPServerSocket? I'm curious.

@guest271314
Copy link

Sure. Here's the original HTTP server https://gist.github.com/guest271314/73a50e9ebc6acaaff5d39f6fc7918ebf.

This is an Isolated Web App (see https://github.com/guest271314/webbundle and https://github.com/guest271314/telnet-client) that I launch using window.open() from arbitrary Web pages with

var w = window.open("isolated-app://<ID>?TCPServerSocket")

and using this in console on the arbitrary Web page


var wss = new WebSocketStream('ws://0.0.0.0:8080');
console.log(wss);
wss.closed.catch((e) => {});

wss.opened.catch((e) => {
  console.log(e);
});

var {
  readable,
  writable
} = await wss.opened.catch(console.error);
var now;
var writer = writable.getWriter();
var abortable = new AbortController();
var controller;
var {
  signal
} = abortable;
//writer.closed.then(() => console.log('writer closed')).catch(() => console.log('writer closed error'));
let minutes = 0;
// .pipeThrough(new TextDecoderStream())
readable.pipeTo(
  new WritableStream({
    async start(c) {
      console.log(c);
      return controller = c;
    },
    write(v) {
      console.log(v, decoder.decode(v));
    },
    close() {
      console.log('Socket closed');
    },
    abort(reason) {

    }
  }), {
    signal
  }).then(() => console.log('pipeThrough, pipeTo Promise')).catch(() => console.log('Done'));


var encoder = new TextEncoder();
const decoder = new TextDecoder();
var enc = (text) => encoder.encode(text);
await writer.write(enc("X"))

index.html

<!doctype html>
<html>

<head>
  <meta charset="utf-8">
  <link rel="manifest" href="manifest.webmanifest">
  <title>TCPServerSocket</title>
  <script type="importmap">
    {
      "imports": {
        "Buffer": "./buffer-bun-bundle.js"
      }
    }
  </script>
</head>

<body style="white-space:pre;font-family:monospace;">
  <script type="module" src="script.js"></script>
</body>

</html>

manifest.webmanifest. Since you are using Node.js' Buffer I import that, which I happen to have already bundled here https://gist.github.com/guest271314/08b19ba88c98a465dd09bcd8a04606f6

<!doctype html>
<html>

<head>
  <meta charset="utf-8">
  <link rel="manifest" href="manifest.webmanifest">
  <title>TCPServerSocket</title>
  <script type="importmap">
    {
      "imports": {
        "Buffer": "./buffer-bun-bundle.js"
      }
    }
  </script>
</head>

<body style="white-space:pre;font-family:monospace;">
  <script type="module" src="script.js"></script>
</body>

</html>

script.js (W.I.P.). I'm aborting the request now instead of normal closing the WebSocket connection

import { Buffer } from "Buffer";

Buffer.prototype.readBigUint64BE = Buffer.prototype.readBigUInt64BE;
globalThis.Buffer = Buffer;

console.log(Buffer);

// https://gist.github.com/robertrypula/b813ffe23a9489bae1b677f1608676c8
const debugBuffer = (bufferName, buffer) => {
  const length = buffer ? buffer.length : "---";

  console.log(`:: DEBUG - ${bufferName} | ${length} | `, buffer, "\n");
};

/*
  https://tools.ietf.org/html/rfc6455#section-5.2
    0                   1                   2                   3
    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
   +-+-+-+-+-------+-+-------------+-------------------------------+
   |F|R|R|R| opcode|M| Payload len |    Extended payload length    |
   |I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
   |N|V|V|V|       |S|             |   (if payload len==126/127)   |
   | |1|2|3|       |K|             |                               |
   +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
   |     Extended payload length continued, if payload len == 127  |
   + - - - - - - - - - - - - - - - +-------------------------------+
   |                               |Masking-key, if MASK set to 1  |
   +-------------------------------+-------------------------------+
   | Masking-key (continued)       |          Payload Data         |
   +-------------------------------- - - - - - - - - - - - - - - - +
   :                     Payload Data continued ...                :
   + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
   |                     Payload Data continued ...                |
   +---------------------------------------------------------------+
  OpCode
    %x0 denotes a continuation frame
    %x1 denotes a text frame
    %x2 denotes a binary frame
    %x3–7 are reserved for further non-control frames
    %x8 denotes a connection close
    %x9 denotes a ping
    %xA denotes a pong
    %xB-F are reserved for further control frames
*/

// ---------------------------------------------------------

const createWebSocketFrame = (payload) => {
  const payloadLengthByteCount = payload.length < 126 ? 0 : 2;
  const buffer = Buffer.alloc(2 + payloadLengthByteCount + payload.length);
  let payloadOffset = 2;

  if (payload.length >= Math.pow(2, 16)) {
    throw new Error("Payload equal or bigger than 64 KiB is not supported");
  }

  buffer.writeUInt8(0b10000010, 0); // FIN flag = 1, opcode = 2 (binary frame)
  buffer.writeUInt8(payload.length < 126 ? payload.length : 126, 1);

  if (payloadLengthByteCount > 0) {
    buffer.writeUInt16BE(payload.length, 2);
    payloadOffset += payloadLengthByteCount;
  }

  payload.copy(buffer, payloadOffset);

  return buffer;
};

// ---------------------------------------------------------

const getParsedBuffer = (buffer) => {
  let bufferRemainingBytes;
  let currentOffset = 0;
  let maskingKey;
  let payload;

  if (currentOffset + 2 > buffer.length) {
    return { payload: null, bufferRemainingBytes: buffer };
  }

  const firstByte = buffer.readUInt8(currentOffset++);
  const secondByte = buffer.readUInt8(currentOffset++);
  const isFinalFrame = !!((firstByte >>> 7) & 0x1);
  const opCode = firstByte & 0xf;
  const isMasked = !!((secondByte >>> 7) & 0x1); // https://security.stackexchange.com/questions/113297
  let payloadLength = secondByte & 0x7f;

  if (!isFinalFrame) {
    console.log("[not final frame detected]\n");
  }

  if (opCode === 0x8) {
    console.log("[connection close frame]\n");
    // TODO read payload, for example payload equal to <0x03 0xe9> means 1001:
    //   1001 indicates that an endpoint is "going away", such as a server
    //   going down or a browser having navigated away from a page.
    // More info here: https://tools.ietf.org/html/rfc6455#section-7.4
    return { payload: null, bufferRemainingBytes: null };
  }

  if (opCode !== 0x2 && opCode !== 0x0) {
    throw new Error("Only binary and continuation frames are supported");
  }

  if (payloadLength > 125) {
    if (payloadLength === 126) {
      if (currentOffset + 2 > buffer.length) {
        return { payload: null, bufferRemainingBytes: buffer };
      }
      payloadLength = buffer.readUInt16BE(currentOffset);
      currentOffset += 2;
    } else {
      throw new Error("Payload equal or bigger than 64 KiB is not supported");
    }
  }

  if (isMasked) {
    if (currentOffset + 4 > buffer.length) {
      return { payload: null, bufferRemainingBytes: buffer };
    }
    maskingKey = buffer.readUInt32BE(currentOffset);
    currentOffset += 4;
  }

  if (currentOffset + payloadLength > buffer.length) {
    console.log("[misalignment between WebSocket frame and NodeJs Buffer]\n");
    return { payload: null, bufferRemainingBytes: buffer };
  }

  payload = Buffer.alloc(payloadLength);

  if (isMasked) {
    for (let i = 0, j = 0; i < payloadLength; ++i, j = i % 4) {
      const shift = j === 3 ? 0 : (3 - j) << 3;
      const mask = (shift === 0 ? maskingKey : maskingKey >>> shift) & 0xff;

      payload.writeUInt8(mask ^ buffer.readUInt8(currentOffset++), i);
    }
  } else {
    for (let i = 0; i < payloadLength; i++) {
      payload.writeUInt8(buffer.readUInt8(currentOffset++), i);
    }
  }

  bufferRemainingBytes = Buffer.alloc(buffer.length - currentOffset);
  for (let i = 0; i < bufferRemainingBytes.length; i++) {
    bufferRemainingBytes.writeUInt8(buffer.readUInt8(currentOffset++), i);
  }

  return { payload, bufferRemainingBytes };
};

// https://stackoverflow.com/a/77398427
async function digest(message, algo = "SHA-1") {
  const { promise, resolve } = Promise.withResolvers();
  const reader = new FileReader();
  reader.onload = () => resolve(reader.result);
  reader.readAsDataURL(
    new Blob([
      new Uint8Array(
        await crypto.subtle.digest(
          algo,
          new TextEncoder().encode(
            `${message}258EAFA5-E914-47DA-95CA-C5AB0DC85B11`,
          ),
        ),
      ),
    ]),
  );
  const result = await promise;
  return result.split(",").pop();
}

function getHeaders(r) {
  const arr = r.match(/.+/g).map((line) => line.split(/:\s|\s\/\s/));
  console.log(arr);
  return new Headers(arr);
}

function parseWebSocketFrame(buffer) {
  let bufferToParse = Buffer.alloc(0);
  let parsedBuffer;

  bufferToParse = Buffer.concat([bufferToParse, buffer]);
  let n = 0;
  do {
    parsedBuffer = getParsedBuffer(bufferToParse);

    debugBuffer("buffer", buffer);
    debugBuffer("bufferToParse", bufferToParse);
    debugBuffer("parsedBuffer.payload", parsedBuffer.payload);
    debugBuffer(
      "parsedBuffer.bufferRemainingBytes",
      parsedBuffer.bufferRemainingBytes,
    );
    if (parsedBuffer.payload === null) {
      return parsedBuffer.payload;
    }
    bufferToParse = parsedBuffer.bufferRemainingBytes;

    if (parsedBuffer.payload || ++n >= 10) {
      //connectedSockets = connectedSockets.filter(connectedSocket => connectedSocket.readyState === 'open');

      // console.log(`[sending parsedBuffer.payload to ${connectedSockets.length} active connection(s)]\n`);
      console.log(n, parsedBuffer);
      break;
      // connectedSockets.forEach(connectedSocket => connectedSocket.write(createWebSocketFrame(parsedBuffer.payload)));
    }
  } while (parsedBuffer.payload && parsedBuffer.bufferRemainingBytes.length);
  return createWebSocketFrame(parsedBuffer.payload);
}

onload = async () => {
  resizeTo(300, 200);
  const USER_AGENT = "Built with Deno/1.42.4";
  console.log(USER_AGENT);
  const encoder = new TextEncoder();
  const decoder = new TextDecoder();

  const encode = (text) => encoder.encode(text);

  const socket = new TCPServerSocket("0.0.0.0", {
    localPort: 8080,
  });

  const {
    readable: server,
    localAddress,
    localPort,
  } = await socket.opened;

  console.log({ server });

  
  // await
  // Handle multiple connections
  await server.pipeTo(
    new WritableStream({
      async write(connection) {
        const {
          readable: client,
          writable,
          remoteAddress,
          remotePort,
        } = await connection.opened;
        console.log({ connection });
        const writer = writable.getWriter();
        console.log({
          remoteAddress,
          remotePort,
        });
        globalThis.client = client;
        globalThis.writer = writer;
        const abortable = new AbortController();
        const { signal } = abortable;
        // .pipeThrough(new TextDecoderStream())
        await client.pipeTo(
          new WritableStream({
            start(controller) {             
              console.log(controller);
            },
            async write(r, controller) {
              const request = decoder.decode(r);
              console.log(request);
              if (!/(GET|POST|HEAD|OPTIONS|QUERY)/i.test(request)) {
                console.log({ data: r });
                const response = parseWebSocketFrame(r);
                console.log({ response });
                if (response === null) {
                  //const Buffer.allocUnsafe(2);
                  //target[0] = 0x80;
                  //target.writeUInt16BE(0x88, 0);
                  // TODO Close without abort or error
                  //return abortable.abort("WebSocket closed");
                  //await writer.write(new Uint8Array([130, 1, 0x88]).buffer);
                  await writer.close();
                  await writer.closed;
                  return abortable.abort("WebSocket closed by client, aborted in server");
                  // return controller.abort();
                  try {
                    //console.log(await client.cancel("Reason"));
                  } catch (e) {
                    //abortable.abort("WebSocket closed");
                  } finally {
                    return;
                  }
                }
                await writer.write(response.buffer);
                // Don't close WebSocket
                // await writer.close();
              }
              if (/^GET/.test(request) && /websocket/i.test(request)) {
                const headers = getHeaders(request);
                const key = headers.get("sec-websocket-key");
                const accept = await digest(key);
                await writer.write(
                  encode("HTTP/1.1 101 Switching Protocols\r\n"),
                );
                await writer.write(encode("Upgrade: websocket\r\n"));
                await writer.write(encode("Connection: Upgrade\r\n"));
                await writer.write(
                  encode(`Sec-WebSocket-Accept: ${accept}\r\n\r\n`),
                );
                // await writer.close();
              }
              if (/^OPTIONS/.test(request)) {
                await writer.write(encode("HTTP/1.1 204 OK\r\n"));
                await writer.write(
                  encode(
                    "Access-Control-Allow-Headers: Access-Control-Request-Private-Network\r\n",
                  ),
                );
                await writer.write(
                  encode("Access-Control-Allow-Origin: *\r\n"),
                );
                await writer.write(
                  encode("Access-Control-Allow-Private-Network: true\r\n"),
                );
                await writer.write(
                  encode(
                    "Access-Control-Allow-Headers: Access-Control-Request-Private-Network\r\n\r\n",
                  ),
                );
                await writer.close();
              }
              if (/^(POST|query)/i.test(request)) {
                const [body] = request.match(
                  /(?<=\r\n\r\n)[a-zA-Z\d\s\r\n-:;=]+/igm,
                );
                console.log({
                  body,
                });
                await writer.write(encode("HTTP/1.1 200 OK\r\n"));
                await writer.write(
                  encode("Content-Type: application/octet-stream\r\n"),
                );
                await writer.write(
                  encode("Access-Control-Allow-Origin: *\r\n"),
                );
                await writer.write(
                  encode("Access-Control-Allow-Private-Network: true\r\n"),
                );
                await writer.write(
                  encode(
                    "Access-Control-Allow-Headers: Access-Control-Request-Private-Network\r\n",
                  ),
                );
                await writer.write(encode("Cache-Control: no-cache\r\n"));
                await writer.write(encode("Connection: close\r\n"));
                await writer.write(
                  encode("Transfer-Encoding: chunked\r\n\r\n"),
                );

                const chunk = encode(body.toUpperCase());
                const size = chunk.buffer.byteLength.toString(16);
                await writer.write(encode(`${size}\r\n`));
                await writer.write(chunk.buffer);
                await writer.write(encode("\r\n"));
                /*
              const response = await fetch("https://gist.githubusercontent.com/guest271314/1e8fab96bd40dc7711b43f5d7faf239e/raw/246bf37c06ebcdca994ef04300c9ff6f8bf5c6cd/signals-proposal-bun-bundle.js");
              await response.body.pipeTo(
                new WritableStream({
                  async write(chunk) {
                    const size = chunk.buffer.byteLength.toString(16);
                    //console.log(chunk.buffer.byteLength, size);
                    await writer.write(encode(`${size}\r\n`));
                    await writer.write(chunk.buffer);
                    await writer.write(encode("\r\n"));
                  },
                  close() {
                    console.log("Stream closed");
                  },
                }),
              );
              */
                await writer.write(encode("0\r\n"));
                await writer.write(encode("\r\n"));
                await writer.close();
              }
            },
            close() {
              console.log("Client closed");
            },
            abort(reason) {
              console.log(reason);
            },
          })
        , {signal}).catch(console.warn);
      },
      close() {
        console.log("Host closed");
      },
      abort(reason) {
        console.log("Host aborted", reason);
      },
    }),
  ).then(() => console.log("Server closed")).catch(console.warn);
};

@guest271314
Copy link

@robertrypula This works for normal close. From https://stackoverflow.com/a/17177146

new Uint8Array([136, 0]).buffer

@guest271314
Copy link

Substituting ArrayBuffer, Uint8Array, DataView for Node.js Buffer

const debugBuffer = (bufferName, buffer) => {
  const length = buffer ? buffer.length : "---";
  console.log(`:: DEBUG - ${bufferName} | ${length} | `, buffer, "\n");
};

/*
  https://tools.ietf.org/html/rfc6455#section-5.2
    0                   1                   2                   3
    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
   +-+-+-+-+-------+-+-------------+-------------------------------+
   |F|R|R|R| opcode|M| Payload len |    Extended payload length    |
   |I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
   |N|V|V|V|       |S|             |   (if payload len==126/127)   |
   | |1|2|3|       |K|             |                               |
   +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
   |     Extended payload length continued, if payload len == 127  |
   + - - - - - - - - - - - - - - - +-------------------------------+
   |                               |Masking-key, if MASK set to 1  |
   +-------------------------------+-------------------------------+
   | Masking-key (continued)       |          Payload Data         |
   +-------------------------------- - - - - - - - - - - - - - - - +
   :                     Payload Data continued ...                :
   + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
   |                     Payload Data continued ...                |
   +---------------------------------------------------------------+
  OpCode
    %x0 denotes a continuation frame
    %x1 denotes a text frame
    %x2 denotes a binary frame
    %x3–7 are reserved for further non-control frames
    %x8 denotes a connection close
    %x9 denotes a ping
    %xA denotes a pong
    %xB-F are reserved for further control frames
*/

// ---------------------------------------------------------

const createWebSocketFrame = (payload) => {
  const payloadLengthByteCount = payload.length < 126 ? 0 : 2;
  const buffer = new ArrayBuffer(2 + payloadLengthByteCount + payload.length);
  const view = new DataView(buffer);
  let payloadOffset = 2;

  if (payload.length >= Math.pow(2, 16)) {
    throw new Error("Payload equal or bigger than 64 KiB is not supported");
  }

  view.setUint8(0, 0b10000010);
  view.setUint8(1, payload.length < 126 ? payload.length : 126);
  if (payloadLengthByteCount > 0) {
    view.setUint16(2, payload.length);
    payloadOffset += payloadLengthByteCount;
  }

  for (let i = 0, j = payloadOffset; i < payload.length; i++, j++) {
    view.setUint8(j, payload[i]);
  }
  return buffer;
};

// ---------------------------------------------------------

const getParsedBuffer = (buffer) => {
  console.log({
    buffer
  });
  const view = new DataView(buffer.buffer);
  let bufferRemainingBytes;
  let currentOffset = 0;
  let maskingKey;
  let payload;

  if (currentOffset + 2 > buffer.length) {
    return {
      payload: null,
      bufferRemainingBytes: buffer
    };
  }

  const firstByte = view.getUint8(currentOffset++);
  const secondByte = view.getUint8(currentOffset++)
  const isFinalFrame = !!((firstByte >>> 7) & 0x1);
  const opCode = firstByte & 0xf;
  const isMasked = !!((secondByte >>> 7) & 0x1); // https://security.stackexchange.com/questions/113297
  let payloadLength = secondByte & 0x7f;

  if (!isFinalFrame) {
    console.log("[not final frame detected]\n");
  }

  if (opCode === 0x8) {
    console.log("[connection close frame]\n");
    // TODO read payload, for example payload equal to <0x03 0xe9> means 1001:
    //   1001 indicates that an endpoint is "going away", such as a server
    //   going down or a browser having navigated away from a page.
    // More info here: https://tools.ietf.org/html/rfc6455#section-7.4
    return {
      payload: null,
      bufferRemainingBytes: null
    };
  }

  if (opCode !== 0x2 && opCode !== 0x0) {
    throw new Error("Only binary and continuation frames are supported");
  }

  if (payloadLength > 125) {
    if (payloadLength === 126) {
      if (currentOffset + 2 > buffer.length) {
        return {
          payload: null,
          bufferRemainingBytes: buffer
        };
      }
      payloadLength = view.getUint16(currentOffset);
      currentOffset += 2;
    } else {
      throw new Error("Payload equal or bigger than 64 KiB is not supported");
    }
  }

  if (isMasked) {
    if (currentOffset + 4 > buffer.length) {
      return {
        payload: null,
        bufferRemainingBytes: buffer
      };
    }
    maskingKey = view.getUint32(currentOffset);
    currentOffset += 4;
  }

  if (currentOffset + payloadLength > buffer.length) {
    console.log("[misalignment between WebSocket frame and NodeJs Buffer]\n");
    return {
      payload: null,
      bufferRemainingBytes: buffer
    };
  }

  payload = new Uint8Array(payloadLength);

  if (isMasked) {
    for (let i = 0, j = 0; i < payloadLength; ++i, j = i % 4) {
      const shift = j === 3 ? 0 : (3 - j) << 3;
      const mask = (shift === 0 ? maskingKey : maskingKey >>> shift) & 0xff;

      payload[i] = mask ^ view.getUint8(currentOffset++);
    }
  } else {
    for (let i = 0; i < payloadLength; i++) {
      payload[i] = view.getUint8(currentOffset++);
    }
  }

  bufferRemainingBytes = new Uint8Array(buffer.length - currentOffset);
  for (let i = 0; i < bufferRemainingBytes.length; i++) {
    bufferRemainingBytes[i] = view.getUint8(currentOffset++);
  }

  return {
    payload,
    bufferRemainingBytes
  };
};

function parseWebSocketFrame(buffer) {
  let bufferToParse = buffer;
  let parsedBuffer;
  do {
    parsedBuffer = getParsedBuffer(bufferToParse);

    debugBuffer("buffer", buffer);
    debugBuffer("bufferToParse", bufferToParse);
    debugBuffer("parsedBuffer.payload", parsedBuffer.payload);
    debugBuffer(
      "parsedBuffer.bufferRemainingBytes",
      parsedBuffer.bufferRemainingBytes,
    );
    if (parsedBuffer.payload === null) {
      return parsedBuffer.payload;
    }
    bufferToParse = parsedBuffer.bufferRemainingBytes;

    if (parsedBuffer.payload) {
      console.log(parsedBuffer);
      break;
    }
  } while (parsedBuffer.payload && parsedBuffer.bufferRemainingBytes.length);
  return createWebSocketFrame(parsedBuffer.payload);
}

// https://stackoverflow.com/a/77398427
async function digest(message, algo = "SHA-1") {
  const {
    promise,
    resolve
  } = Promise.withResolvers();
  const reader = new FileReader();
  reader.onload = () => resolve(reader.result);
  reader.readAsDataURL(
    new Blob([
      new Uint8Array(
        await crypto.subtle.digest(
          algo,
          new TextEncoder().encode(
            `${message}258EAFA5-E914-47DA-95CA-C5AB0DC85B11`,
          ),
        ),
      ),
    ]),
  );
  const result = await promise;
  return result.split(",").pop();
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment