Created
September 19, 2018 22:00
-
-
Save csabatuz/b21e410d9dd463a27e64657e1b92fb97 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
import * as BrowserFS from "browserfs"; | |
const IO_LENGTH = 52428800; | |
const INPUT_TYPE = { | |
STDIN: "stdin", | |
WORKERFS: "workerfs", | |
}; | |
const OUTPUT_TYPE = { | |
STDOUT: "stdout", | |
MEMFS: "memfs", | |
IDBFS: "idbfs", | |
HTML5: "html5", | |
}; | |
const getInt8 = (b) => { | |
if (b > Math.pow(2, 7) - 1) { | |
return b - Math.pow(2, 8); | |
} | |
return b; | |
}; | |
const processConfig = (config, file) => { | |
switch (config.inputForm) { | |
case INPUT_TYPE.WORKERFS: | |
config.infile = `/data/${file.name}`; | |
break; | |
case INPUT_TYPE.STDIN: | |
config.infile = "/dev/stdin"; | |
break; | |
} | |
switch (config.outputForm) { | |
case OUTPUT_TYPE.STDOUT: | |
config.outfile = "/dev/stdout"; | |
break; | |
case OUTPUT_TYPE.MEMFS: | |
config.outfile = `/work/${file.name}`; | |
break; | |
case OUTPUT_TYPE.IDBFS: | |
config.outfile = `/idbfs/${file.name}`; | |
break; | |
case OUTPUT_TYPE.HTML5: | |
config.outfile = `/html5fs/${file.name}`; | |
} | |
}; | |
const execute = (config, runner, args, stream, file, worker) => { | |
let inErrorState; | |
let inputPos = 0; | |
const totalSize = file.size; | |
// Drives to mount | |
const mounts = []; | |
const mountPromise = new Promise((resolve, reject) => { | |
if (config.outputForm === OUTPUT_TYPE.IDBFS || config.inputForm === INPUT_TYPE.IDBFS) { | |
mounts.push({type: "IDBFS", opts: {}, mountpoint: "/idbfs"}); | |
} | |
if (config.outputForm === OUTPUT_TYPE.MEMFS) { | |
mounts.push({type: "MEMFS", opts: {}, mountpoint: "/output"}); | |
} | |
if (config.inputForm === INPUT_TYPE.WORKERFS) { | |
mounts.push({type: "WORKERFS", opts: {files: [file]}, mountpoint: "/data"}); | |
} | |
if (config.outputForm === OUTPUT_TYPE.HTML5) { | |
BrowserFS.configure({ | |
fs: "AsyncMirror", | |
options: { | |
sync: { | |
fs: "InMemory", | |
}, | |
async: { | |
fs: "WorkerFS", | |
options: {worker}, | |
}, | |
}, | |
}, function(e) { | |
if (e) { | |
console.log(e); | |
throw e; | |
} | |
mounts.push({fs: new BrowserFS.EmscriptenFS(), opts: {}, mountpoint: "/html5fs"}); | |
resolve(); | |
}); | |
} else { | |
resolve(); | |
} | |
}); | |
/** | |
* Gets a single byte written to stdout. | |
* | |
* @param {number} byte | |
*/ | |
const stdout = config.outputForm !== OUTPUT_TYPE.STDOUT ? () => {} : (byte) => { | |
if ("number" === typeof byte) { | |
stream.processData(byte); | |
} | |
}; | |
/** | |
* | |
* Expected to return the next char in stdin. | |
* - `null` means EOF | |
* - undefined means I/O wait | |
* - byte value means char on stdin | |
* | |
* @return {number} a byte, null, or undefined | |
*/ | |
const stdin = () => { | |
if (config.inputForm !== INPUT_TYPE.STDIN) { | |
return undefined; | |
} | |
// Read a new file slice if needed | |
if (stream.readyForData() && inputPos < totalSize) { | |
const slice = file.slice(inputPos, Math.min(totalSize, inputPos + IO_LENGTH)); | |
const reader = new FileReaderSync(); | |
const buf = reader.readAsArrayBuffer(slice); | |
stream.write(new Buffer(buf)); | |
inputPos += IO_LENGTH; | |
} | |
// get the first char in the input buffer (FIFO) | |
const c = stream.nextInputChar(); | |
if (inputPos >= totalSize && c === undefined) { | |
return null; | |
} | |
return c; | |
}; | |
return mountPromise.then(() => { | |
const result = runner({ | |
mounts, | |
arguments: args, | |
stdout, | |
stdin, | |
onExit(code) { | |
// this stream should be end()-ed and closed off | |
if (0 !== code) { | |
inErrorState = true; | |
} | |
console.log("EMScripten operation finished with code: ", code); | |
}, | |
}); | |
if (config.outputForm !== OUTPUT_TYPE.STDOUT) { | |
if (!inErrorState) { | |
let view = config.idbfs ? new Int8Array(result.IDBFS[0].data) : result.MEMFS[0].data; | |
for (let i = 0; i < view.length; ++i) { | |
stream.processData(getInt8(view[i])); | |
} | |
} | |
} | |
stream.end(); | |
return inErrorState; | |
}); | |
}; | |
module.exports = { | |
INPUT_TYPE, | |
OUTPUT_TYPE, | |
processConfig, | |
execute, | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
/* global Atomics */ | |
import stream from "stream"; | |
class EMScriptenStream extends stream.Readable { | |
constructor(options, worker) { | |
options.objectMode = true; | |
super(options); | |
this._worker = worker; | |
this._backpressure = false; | |
this._chunks = []; | |
this._wants_data = false; | |
this._worker.addEventListener("message", (e) => { | |
if ("chunk" === e.data.type) { | |
this._chunks.push(e.data.chunk); | |
if (this._wants_data) { | |
const chunk = this._chunks[0]; | |
this._chunks = this._chunks.slice(1); | |
this._wants_data = this.push(chunk); | |
} | |
} | |
}); | |
} | |
_read(size) { | |
let go = true; | |
while (0 !== this._chunks.length && go) { | |
const chunk = this._chunks[0]; | |
this._chunks = this._chunks.slice(1); | |
go = this.push(chunk); | |
} | |
this._wants_data = go; | |
} | |
static waitForData(chunk) { | |
console.log("Waiting for data"); | |
const int32Out = new Int32Array(chunk.sab); | |
Atomics.wait(int32Out, 0, 0x00000000); | |
Atomics.wait(int32Out, 0, 0x00000001); | |
} | |
static releaseData(chunk) { | |
console.log("Releasing data for overwrite"); | |
const int32Out = new Int32Array(chunk.sab); | |
Atomics.store(int32Out, 0, 0x00000001); | |
Atomics.wake(int32Out, 0); | |
} | |
static readyForNextChunk(chunk) { | |
console.log("Releasing chunk emission"); | |
const int32Out = new Int32Array(chunk.sab); | |
Atomics.store(int32Out, 0, 0x00000000); | |
Atomics.wake(int32Out, 0); | |
} | |
} | |
export default EMScriptenStream; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* global Atomics, SharedArrayBuffer */ | |
import stream from "stream"; | |
class EMScriptenWorkerStream extends stream.Writable { | |
constructor(options, signal) { | |
super(options); | |
this._signal = signal; | |
this._targetSize = options.targetSize; | |
this._inputBuffers = []; | |
this._inputBufferPtr = 0; | |
this._inputSize = 0; | |
this._outputBuffer = new Buffer(this._targetSize + 8196); | |
this._outputSize = 0; | |
this._sab = new SharedArrayBuffer(this._targetSize + 4); | |
this._totalSize = 0; | |
this._index = 0; | |
this._noMoreData = false; | |
this._writeCallbacks = []; | |
} | |
_dump() { | |
if (0 === this._outputSize) { | |
return; | |
} | |
const origLength = this._outputSize; | |
const int32Out = new Int32Array(this._sab); | |
console.log("New chunk ready. Waiting for permission to overwrite buffer."); | |
Atomics.wait(int32Out, 0, 0x00000002); | |
console.log("We can overwrite the buffer now."); | |
const chunkSize = Math.min(this._targetSize, this._outputSize); | |
const byteOut = new Uint8Array(this._sab).slice(4, chunkSize + 4); | |
this._outputBuffer.copy(byteOut, 0, 0, chunkSize); | |
this._outputSize = origLength - chunkSize; | |
if (0 < this._outputSize) { | |
const remainder = this._outputBuffer.slice(chunkSize); | |
remainder.copy(this._outputBuffer, 0, 0, this._outputSize); | |
} | |
this._totalSize += chunkSize; | |
const lastChunk = this.lastChunk(); | |
// We wait for Free-to-Emit signal. | |
console.log("Data is ready. Waiting for permission to emit chunk"); | |
Atomics.wait(int32Out, 0, 0x00000001); | |
console.log("We can emit the chunk."); | |
this._signal({ | |
chunk: { | |
sab: this._sab, | |
data: byteOut, | |
size: chunkSize, | |
totalSize: lastChunk ? this._totalSize : (this._totalSize + chunkSize), | |
isLast: lastChunk, | |
index: this._index, | |
}, | |
}); | |
// We signal Free-to-Read signal | |
Atomics.store(int32Out, 0, 0x00000002); | |
Atomics.wake(int32Out, 0); | |
console.log("Emitted chunk: ", chunkSize, lastChunk); | |
this._index += 1; | |
} | |
_write(chunk, encoding, done) { | |
this._inputBuffers.push(chunk); | |
this._inputSize += chunk.byteLength; | |
// this._writeCallbacks.push(done); | |
if (done) { | |
done(); | |
} | |
} | |
_final() { | |
this._noMoreData = true; | |
this._dump(); | |
} | |
lastChunk() { | |
return this._noMoreData; | |
} | |
processData(byte) { | |
if (this._noMoreData) { | |
console.log("ERROR!!! Received data when we shouldn't anymore"); | |
} | |
this._outputBuffer.writeInt8(byte, this._outputSize); | |
this._outputSize += 1; | |
while (this._outputSize >= this._targetSize) { | |
this._dump(); | |
} | |
} | |
nextInputChar() { | |
const buffer = this._inputBuffers[0]; | |
if (!buffer) { | |
return undefined; | |
} | |
const c = buffer[this._inputBufferPtr]; | |
this._inputBufferPtr += 1; | |
this._inputSize -= 1; | |
if (buffer.byteLength <= this._inputBufferPtr) { | |
this._inputBufferPtr = 0; | |
this._inputBuffers = this._inputBuffers.slice(1); | |
} | |
if (this.readyForData()) { | |
this._writeCallbacks.forEach((cb) => { | |
cb(); | |
}); | |
this._writeCallbacks = []; | |
} | |
return c; | |
} | |
readyForData() { | |
return this._inputSize < this._targetSize * 2; | |
} | |
} | |
export default EMScriptenWorkerStream; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment