Skip to content

Instantly share code, notes, and snippets.

@friendlyanon
Created November 23, 2018 16:47
Show Gist options
  • Save friendlyanon/f3bf2282e27caabbf2816a46eb30cb47 to your computer and use it in GitHub Desktop.
Save friendlyanon/f3bf2282e27caabbf2816a46eb30cb47 to your computer and use it in GitHub Desktop.
"use strict";
const fs = require("fs");
// https://gist.github.com/friendlyanon/afd5a18811c540bc8bcf3f40560d4c74
const Queue = require("./queue");
function nodeCb(error, result) {
const { resolve, reject } = this;
this.data =
this.resolve =
this.reject = null;
if (error) reject(error);
else resolve(result);
}
class GetRequest {
constructor(url, options = {}) {
const slice = url.slice(0, url.indexOf(":"));
switch (slice) {
case "http": case "https": this.agent = this[slice]; break;
default: throw new TypeError("Unknown protocol");
}
this.url = url;
this.options = options;
}
then(resolve) {
this.agent.get(this.url, this.options, resolve);
}
} // class GetRequest
Object.defineProperties(GetRequest.prototype, {
https: {
value: require("https"),
writable: true,
configurable: true
},
http: {
value: require("http"),
writable: true,
configurable: true
}
});
class DataForwarder {
constructor(fd, request, handler) {
this.fd = fd;
(this.request = request).__dataforwarder = this;
this.handler = handler;
}
then(resolve, reject) {
this.resolve = resolve;
this.reject = reject;
(
this.request
.on("error", this.onError)
.on("data", this.onData)
.on("end", this.onEnd)
);
}
onData(chunk) {
const { handler, fd } = this.__dataforwarder;
handler(fd, chunk);
}
onError(error) {
this.__dataforwarder.reject(error);
}
onEnd() {
this.__dataforwarder.resolve();
}
} // class DataForwarder
class DataWriter {
constructor() {
this.callback = nodeCb.bind(this);
}
setup(fd) {
this.fd = fd;
}
write(data) {
this.data = data;
return this;
}
then(resolve, reject) {
this.resolve = resolve;
this.reject = reject;
fs.write(this.fd, this.data, this.callback);
}
} // class DataWriter
class GetFileDescriptor {
constructor() {
this.method = "open";
this.callback = nodeCb.bind(this);
}
setup(...args) {
return this.args = args, this.method = "open", this;
}
close(fd) {
return this.args = [fd], this.method = "close", this;
}
then(resolve, reject) {
this.resolve = resolve;
this.reject = reject;
fs[this.method](...this.args, this.callback);
}
} // class GetFileDescriptor
class StreamIntoFile {
constructor(options) {
this._opener = new GetFileDescriptor;
this._writer = new DataWriter;
this._forwarderCallback = this._flush.bind(this);
this._options = {
flags: "a",
...options
};
}
async _flush(fd, chunk) {
const { _buffer, _writer } = this;
_buffer.enqueue(chunk);
if (this._running) return;
_writer.setup(fd);
this._running = true;
try {
while (_buffer.size()) {
await _writer.write(_buffer.dequeue());
}
}
catch (e) { console.error(e); }
this._running = false;
}
async then(resolve, reject) {
if (this._inUse) return reject(new Error("This object is already in use"));
let error;
this._inUse = true;
this._buffer = new Queue;
this._offset = 0;
try {
const opts = this._options;
const fd = await this._opener.setup(opts.path, opts.flags, opts.mode);
try {
const request = await new GetRequest(opts.url, opts.options);
await new DataForwarder(fd, request, this._forwarderCallback);
}
catch (e) { error = e; }
finally {
try { await this._opener.close(fd); }
catch (e) { error = e; }
}
}
catch (e) { error = e; }
this._inUse = false;
return error != null ? reject(error) : resolve();
}
} // class StreamIntoFile
module.exports = StreamIntoFile;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment