Skip to content

Instantly share code, notes, and snippets.

@csabatuz
Created September 19, 2018 22:00
Show Gist options
  • Save csabatuz/b21e410d9dd463a27e64657e1b92fb97 to your computer and use it in GitHub Desktop.
Save csabatuz/b21e410d9dd463a27e64657e1b92fb97 to your computer and use it in GitHub Desktop.
"use strict";
import * as BrowserFS from "browserfs";
const IO_LENGTH = 52428800;
const INPUT_TYPE = {
STDIN: "stdin",
WORKERFS: "workerfs",
};
const OUTPUT_TYPE = {
STDOUT: "stdout",
MEMFS: "memfs",
IDBFS: "idbfs",
HTML5: "html5",
};
const getInt8 = (b) => {
if (b > Math.pow(2, 7) - 1) {
return b - Math.pow(2, 8);
}
return b;
};
const processConfig = (config, file) => {
switch (config.inputForm) {
case INPUT_TYPE.WORKERFS:
config.infile = `/data/${file.name}`;
break;
case INPUT_TYPE.STDIN:
config.infile = "/dev/stdin";
break;
}
switch (config.outputForm) {
case OUTPUT_TYPE.STDOUT:
config.outfile = "/dev/stdout";
break;
case OUTPUT_TYPE.MEMFS:
config.outfile = `/work/${file.name}`;
break;
case OUTPUT_TYPE.IDBFS:
config.outfile = `/idbfs/${file.name}`;
break;
case OUTPUT_TYPE.HTML5:
config.outfile = `/html5fs/${file.name}`;
}
};
const execute = (config, runner, args, stream, file, worker) => {
let inErrorState;
let inputPos = 0;
const totalSize = file.size;
// Drives to mount
const mounts = [];
const mountPromise = new Promise((resolve, reject) => {
if (config.outputForm === OUTPUT_TYPE.IDBFS || config.inputForm === INPUT_TYPE.IDBFS) {
mounts.push({type: "IDBFS", opts: {}, mountpoint: "/idbfs"});
}
if (config.outputForm === OUTPUT_TYPE.MEMFS) {
mounts.push({type: "MEMFS", opts: {}, mountpoint: "/output"});
}
if (config.inputForm === INPUT_TYPE.WORKERFS) {
mounts.push({type: "WORKERFS", opts: {files: [file]}, mountpoint: "/data"});
}
if (config.outputForm === OUTPUT_TYPE.HTML5) {
BrowserFS.configure({
fs: "AsyncMirror",
options: {
sync: {
fs: "InMemory",
},
async: {
fs: "WorkerFS",
options: {worker},
},
},
}, function(e) {
if (e) {
console.log(e);
throw e;
}
mounts.push({fs: new BrowserFS.EmscriptenFS(), opts: {}, mountpoint: "/html5fs"});
resolve();
});
} else {
resolve();
}
});
/**
* Gets a single byte written to stdout.
*
* @param {number} byte
*/
const stdout = config.outputForm !== OUTPUT_TYPE.STDOUT ? () => {} : (byte) => {
if ("number" === typeof byte) {
stream.processData(byte);
}
};
/**
*
* Expected to return the next char in stdin.
* - `null` means EOF
* - undefined means I/O wait
* - byte value means char on stdin
*
* @return {number} a byte, null, or undefined
*/
const stdin = () => {
if (config.inputForm !== INPUT_TYPE.STDIN) {
return undefined;
}
// Read a new file slice if needed
if (stream.readyForData() && inputPos < totalSize) {
const slice = file.slice(inputPos, Math.min(totalSize, inputPos + IO_LENGTH));
const reader = new FileReaderSync();
const buf = reader.readAsArrayBuffer(slice);
stream.write(new Buffer(buf));
inputPos += IO_LENGTH;
}
// get the first char in the input buffer (FIFO)
const c = stream.nextInputChar();
if (inputPos >= totalSize && c === undefined) {
return null;
}
return c;
};
return mountPromise.then(() => {
const result = runner({
mounts,
arguments: args,
stdout,
stdin,
onExit(code) {
// this stream should be end()-ed and closed off
if (0 !== code) {
inErrorState = true;
}
console.log("EMScripten operation finished with code: ", code);
},
});
if (config.outputForm !== OUTPUT_TYPE.STDOUT) {
if (!inErrorState) {
let view = config.idbfs ? new Int8Array(result.IDBFS[0].data) : result.MEMFS[0].data;
for (let i = 0; i < view.length; ++i) {
stream.processData(getInt8(view[i]));
}
}
}
stream.end();
return inErrorState;
});
};
module.exports = {
INPUT_TYPE,
OUTPUT_TYPE,
processConfig,
execute,
};
"use strict";
/* global Atomics */
import stream from "stream";
class EMScriptenStream extends stream.Readable {
constructor(options, worker) {
options.objectMode = true;
super(options);
this._worker = worker;
this._backpressure = false;
this._chunks = [];
this._wants_data = false;
this._worker.addEventListener("message", (e) => {
if ("chunk" === e.data.type) {
this._chunks.push(e.data.chunk);
if (this._wants_data) {
const chunk = this._chunks[0];
this._chunks = this._chunks.slice(1);
this._wants_data = this.push(chunk);
}
}
});
}
_read(size) {
let go = true;
while (0 !== this._chunks.length && go) {
const chunk = this._chunks[0];
this._chunks = this._chunks.slice(1);
go = this.push(chunk);
}
this._wants_data = go;
}
static waitForData(chunk) {
console.log("Waiting for data");
const int32Out = new Int32Array(chunk.sab);
Atomics.wait(int32Out, 0, 0x00000000);
Atomics.wait(int32Out, 0, 0x00000001);
}
static releaseData(chunk) {
console.log("Releasing data for overwrite");
const int32Out = new Int32Array(chunk.sab);
Atomics.store(int32Out, 0, 0x00000001);
Atomics.wake(int32Out, 0);
}
static readyForNextChunk(chunk) {
console.log("Releasing chunk emission");
const int32Out = new Int32Array(chunk.sab);
Atomics.store(int32Out, 0, 0x00000000);
Atomics.wake(int32Out, 0);
}
}
export default EMScriptenStream;
/* global Atomics, SharedArrayBuffer */
import stream from "stream";
class EMScriptenWorkerStream extends stream.Writable {
constructor(options, signal) {
super(options);
this._signal = signal;
this._targetSize = options.targetSize;
this._inputBuffers = [];
this._inputBufferPtr = 0;
this._inputSize = 0;
this._outputBuffer = new Buffer(this._targetSize + 8196);
this._outputSize = 0;
this._sab = new SharedArrayBuffer(this._targetSize + 4);
this._totalSize = 0;
this._index = 0;
this._noMoreData = false;
this._writeCallbacks = [];
}
_dump() {
if (0 === this._outputSize) {
return;
}
const origLength = this._outputSize;
const int32Out = new Int32Array(this._sab);
console.log("New chunk ready. Waiting for permission to overwrite buffer.");
Atomics.wait(int32Out, 0, 0x00000002);
console.log("We can overwrite the buffer now.");
const chunkSize = Math.min(this._targetSize, this._outputSize);
const byteOut = new Uint8Array(this._sab).slice(4, chunkSize + 4);
this._outputBuffer.copy(byteOut, 0, 0, chunkSize);
this._outputSize = origLength - chunkSize;
if (0 < this._outputSize) {
const remainder = this._outputBuffer.slice(chunkSize);
remainder.copy(this._outputBuffer, 0, 0, this._outputSize);
}
this._totalSize += chunkSize;
const lastChunk = this.lastChunk();
// We wait for Free-to-Emit signal.
console.log("Data is ready. Waiting for permission to emit chunk");
Atomics.wait(int32Out, 0, 0x00000001);
console.log("We can emit the chunk.");
this._signal({
chunk: {
sab: this._sab,
data: byteOut,
size: chunkSize,
totalSize: lastChunk ? this._totalSize : (this._totalSize + chunkSize),
isLast: lastChunk,
index: this._index,
},
});
// We signal Free-to-Read signal
Atomics.store(int32Out, 0, 0x00000002);
Atomics.wake(int32Out, 0);
console.log("Emitted chunk: ", chunkSize, lastChunk);
this._index += 1;
}
_write(chunk, encoding, done) {
this._inputBuffers.push(chunk);
this._inputSize += chunk.byteLength;
// this._writeCallbacks.push(done);
if (done) {
done();
}
}
_final() {
this._noMoreData = true;
this._dump();
}
lastChunk() {
return this._noMoreData;
}
processData(byte) {
if (this._noMoreData) {
console.log("ERROR!!! Received data when we shouldn't anymore");
}
this._outputBuffer.writeInt8(byte, this._outputSize);
this._outputSize += 1;
while (this._outputSize >= this._targetSize) {
this._dump();
}
}
nextInputChar() {
const buffer = this._inputBuffers[0];
if (!buffer) {
return undefined;
}
const c = buffer[this._inputBufferPtr];
this._inputBufferPtr += 1;
this._inputSize -= 1;
if (buffer.byteLength <= this._inputBufferPtr) {
this._inputBufferPtr = 0;
this._inputBuffers = this._inputBuffers.slice(1);
}
if (this.readyForData()) {
this._writeCallbacks.forEach((cb) => {
cb();
});
this._writeCallbacks = [];
}
return c;
}
readyForData() {
return this._inputSize < this._targetSize * 2;
}
}
export default EMScriptenWorkerStream;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment