Skip to content

Instantly share code, notes, and snippets.

@nestarz
Last active February 22, 2023 17:18
Show Gist options
  • Save nestarz/c0592f7186a138c4cd85f78492b267cd to your computer and use it in GitHub Desktop.
Save nestarz/c0592f7186a138c4cd85f78492b267cd to your computer and use it in GitHub Desktop.
websocketstream-polyfill.js
export const ReadableStream = window.ReadableStream && window.ReadableStream.prototype[Symbol.asyncIterator] ?
window.ReadableStream : (() => {
function ReadableStream (...args) {
let readers = []
let obj = args[0]
let stream = new window.ReadableStream(...args)
// patch cancel to release lock
let _cancel = stream.cancel
stream.cancel = function cancel(...args) {
readers.forEach(reader => {
reader.cancel()
reader.releaseLock()
})
readers.length = 0
_cancel.call(this, ...args)
}
stream[Symbol.asyncIterator] =
stream.getIterator = function () {
const reader = stream.getReader()
readers.push(reader)
return {
next() {
return reader.read()
},
return() {
readers.splice(readers.indexOf(reader), 1)
reader.releaseLock()
return {}
},
[Symbol.asyncIterator]() {
return this
}
};
}
return stream
}
return ReadableStream
})()
export class WebSocketStream {
url;
connection;
closed;
close;
constructor(url, options = {}) {
if (options.signal?.aborted) {
throw new DOMException("This operation was aborted", "AbortError");
}
this.url = url;
const ws = new WebSocket(url, options.protocols ?? []);
ws.binaryType = "arraybuffer";
const closeWithInfo = ({ code, reason } = {}) => ws.close(code, reason);
this.connection = new Promise((resolve, reject) => {
ws.onopen = () => {
resolve({
readable: new ReadableStream({
start(controller) {
ws.onmessage = ({ data }) => controller.enqueue(data);
ws.onerror = (e) => controller.error(e);
},
cancel: closeWithInfo
}),
writable: new WritableStream({
write(chunk) {
ws.send(chunk);
},
abort() {
ws.close();
},
close: closeWithInfo
}),
protocol: ws.protocol,
extensions: ws.extensions
});
ws.removeEventListener("error", reject);
};
ws.addEventListener("error", reject);
});
this.closed = new Promise((resolve, reject) => {
ws.onclose = ({ code, reason }) => {
resolve({ code, reason });
ws.removeEventListener("error", reject);
};
ws.addEventListener("error", reject);
});
if (options.signal) {
options.signal.onabort = () => ws.close();
}
this.close = closeWithInfo;
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment