Created
April 28, 2015 14:36
-
-
Save domenic/2fbc9719d72316b04166 to your computer and use it in GitHub Desktop.
ReadableStream.js
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(function() { | |
'use strict'; | |
const readableStreamClosedPromise = %CreatePrivateOwnSymbol('[[closedPromise]]'); | |
const readableStreamCloseRequested = %CreatePrivateOwnSymbol('[[closeRequested]]'); | |
const readableStreamController = %CreatePrivateOwnSymbol('[[controller]]'); | |
const readableStreamPullAgain = %CreatePrivateOwnSymbol('[[pullAgain]]'); | |
const readableStreamPulling = %CreatePrivateOwnSymbol('[[pulling]]'); | |
const readableStreamQueue = %CreatePrivateOwnSymbol('[[queue]]'); | |
const readableStreamReader = %CreatePrivateOwnSymbol('[[reader]]'); | |
const readableStreamStarted = %CreatePrivateOwnSymbol('[[started]]'); | |
const readableStreamState = %CreatePrivateOwnSymbol('[[state]]'); | |
const readableStreamStoredError = %CreatePrivateOwnSymbol('[[storedError]]'); | |
const readableStreamStrategySize = %CreatePrivateOwnSymbol('[[strategySize]]'); | |
const readableStreamStrategyHWM = %CreatePrivateOwnSymbol('[[strategyHWM]]'); | |
const readableStreamUnderlyingSource = %CreatePrivateOwnSymbol('[[underlyingSource]]'); | |
const readableStreamControllerControlledReadableStream = %CreatePrivateOwnSymbol('[[controlledReadableStream]]'); | |
const readableStreamReaderClosedPromise = %CreatePrivateOwnSymbol('[[closedPromise]]'); | |
const readableStreamReaderClosedPromise_resolve = %CreatePrivateOwnSymbol('[[closedPromise]] resolve'); | |
const readableStreamReaderClosedPromise_reject = %CreatePrivateOwnSymbol('[[closedPromise]] reject'); | |
const readableStreamReaderOwnerReadableStream = %CreatePrivateOwnSymbol('[[ownerReadableStream]]'); | |
const readableStreamReaderReadRequests = %CreatePrivateOwnSymbol('[[readRequests]]'); | |
const readableStreamReaderState = %CreatePrivateOwnSymbol('[[state]]'); | |
const readableStreamReaderStoredError = %CreatePrivateOwnSymbol('[[storedError]]'); | |
const queueSize = %CreatePrivateOwnSymbol('Queue-with-sizes queue size'); | |
const readRequestPromise = %CreatePrivateOwnSymbol('[[promise]]'); | |
const readRequestResolve = %CreatePrivateOwnSymbol('[[promise]] resolve'); | |
const readRequestReject = %CreatePrivateOwnSymbol('[[promise]] reject'); | |
// TODO(domenic): come up with a better way of getting at intrinsics | |
const Number = global.Number; | |
const TypeError = global.TypeError; | |
const RangeError = global.RangeError; | |
const Promise = global.Promise; | |
const Number_isNaN = Number.isNaN; | |
const Promise_resolve = Promise.resolve.bind(Promise); | |
const Promise_reject = Promise.reject.bind(Promise); | |
const uncurryThis = Function.prototype.bind.bind(Function.prototype.call); | |
const applyFunction = uncurryThis(Function.prototype.apply); | |
const thenPromise = uncurryThis(Promise.prototype.then); | |
const shiftArray = uncurryThis(Array.prototype.shift); | |
const pushArray = uncurryThis(Array.prototype.push); | |
// TODO(domenic): need to censor Function.prototype.toString for these; use V8 API presumably | |
class ReadableStream { | |
constructor(underlyingSource, strategy) { | |
if (underlyingSource === undefined) { | |
underlyingSource = {}; | |
} | |
if (strategy === undefined) { | |
strategy = {}; | |
} | |
const size = strategy.size; | |
let highWaterMark = strategy.highWaterMark; | |
if (highWaterMark === undefined) { | |
highWaterMark = 1; | |
} | |
const normalizedStrategy = ValidateAndNormalizeQueuingStrategy(size, highWaterMark); | |
this[readableStreamUnderlyingSource] = underlyingSource; | |
// TODO(domenic) use a real queue data structure | |
const queue = []; | |
queue[queueSize] = 0; | |
this[readableStreamQueue] = queue; | |
// TODO(domenic) consolidate booleans into a bit field? | |
// TODO(domenic) use integers for state? (or put in bit field?) | |
this[readableStreamState] = 'readable'; | |
this[readableStreamStarted] = false; | |
this[readableStreamCloseRequested] = false; | |
this[readableStreamPulling] = false; | |
this[readableStreamPullAgain] = false; | |
this[readableStreamReader] = undefined; | |
this[readableStreamStoredError] = undefined; | |
this[readableStreamStrategySize] = normalizedStrategy.size; | |
this[readableStreamStrategyHWM] = normalizedStrategy.highWaterMark; | |
const controller = new ReadableStreamController(this); | |
this[readableStreamController] = controller; | |
const that = this; | |
const startResult = InvokeOrNoop(underlyingSource, 'start', [controller]); | |
thenPromise(Promise_resolve(startResult), | |
function() { | |
that[readableStreamStarted] = true; | |
RequestReadableStreamPull(that); | |
}, | |
function(r) { | |
if (that[readableStreamState] === 'readable') { | |
return ErrorReadableStream(that, r); | |
} | |
} | |
); | |
} | |
cancel(reason) { | |
if (IsReadableStream(this) === false) { | |
return Promise_reject(new TypeError( | |
'ReadableStream.prototype.cancel can only be used on a ReadableStream')); | |
} | |
if (IsReadableStreamLocked(this) === true) { | |
return Promise_reject(new TypeError( | |
'Cannot cancel a stream that already has a reader')); | |
} | |
return CancelReadableStream(this, reason); | |
} | |
getReader() { | |
if (IsReadableStream(this) === false) { | |
throw new TypeError('ReadableStream.prototype.getReader can only be used on a ReadableStream'); | |
} | |
return AcquireReadableStreamReader(this); | |
} | |
} | |
class ReadableStreamController { | |
constructor(stream) { | |
if (IsReadableStream(stream) === false) { | |
throw new TypeError('ReadableStreamController can only be constructed with a ReadableStream instance'); | |
} | |
if (stream[readableStreamController] !== undefined) { | |
throw new TypeError( | |
'ReadableStreamController instances can only be created by the ReadableStream constructor'); | |
} | |
this[readableStreamControllerControlledReadableStream] = stream; | |
} | |
get desiredSize() { | |
if (IsReadableStreamController(this) === false) { | |
throw new TypeError( | |
'ReadableStreamController.prototype.desiredSize can only be used on a ReadableStreamController'); | |
} | |
return GetReadableStreamDesiredSize(this[readableStreamControllerControlledReadableStream]); | |
} | |
close() { | |
if (IsReadableStreamController(this) === false) { | |
throw new TypeError( | |
'ReadableStreamController.prototype.close can only be used on a ReadableStreamController'); | |
} | |
const stream = this[readableStreamControllerControlledReadableStream]; | |
if (stream[readableStreamCloseRequested] === true) { | |
throw new TypeError('The stream has already been closed; do not close it again!'); | |
} | |
if (stream[readableStreamState] === 'errored') { | |
throw new TypeError('The stream is in an errored state and cannot be closed'); | |
} | |
return CloseReadableStream(stream); | |
} | |
enqueue(chunk) { | |
if (IsReadableStreamController(this) === false) { | |
throw new TypeError( | |
'ReadableStreamController.prototype.enqueue can only be used on a ReadableStreamController'); | |
} | |
const stream = this[readableStreamControllerControlledReadableStream]; | |
if (stream[readableStreamState] === 'errored') { | |
throw stream[readableStreamStoredError]; | |
} | |
if (stream[readableStreamCloseRequested] === true) { | |
throw new TypeError('stream is closed or draining'); | |
} | |
return EnqueueInReadableStream(stream, chunk); | |
} | |
error(e) { | |
if (IsReadableStreamController(this) === false) { | |
throw new TypeError( | |
'ReadableStreamController.prototype.error can only be used on a ReadableStreamController'); | |
} | |
const stream = this[readableStreamControllerControlledReadableStream]; | |
const state = stream[readableStreamState]; | |
if (state !== 'readable') { | |
throw new TypeError(`The stream is ${state} and so cannot be errored`); | |
} | |
return ErrorReadableStream(stream, e); | |
} | |
} | |
class ReadableStreamReader { | |
constructor(stream) { | |
if (IsReadableStream(stream) === false) { | |
throw new TypeError('ReadableStreamReader can only be constructed with a ReadableStream instance'); | |
} | |
if (IsReadableStreamLocked(stream) === true) { | |
throw new TypeError('This stream has already been locked for exclusive reading by another reader'); | |
} | |
stream[readableStreamReader] = this; | |
this[readableStreamReaderOwnerReadableStream] = stream; | |
// TODO(domenic): use integers for state? | |
this[readableStreamReaderState] = 'readable'; | |
this[readableStreamReaderStoredError] = undefined; | |
// TODO(domenic): use a real queue data structure | |
this[readableStreamReaderReadRequests] = []; | |
// TODO(domenic): use faster means of creating/resolving/rejecting promises | |
const that = this; | |
this[readableStreamReaderClosedPromise] = new Promise(function(resolve, reject) { | |
that[readableStreamReaderClosedPromise_resolve] = resolve; | |
that[readableStreamReaderClosedPromise_reject] = reject; | |
}); | |
const streamState = stream[readableStreamState]; | |
if (streamState === 'closed' || streamState === 'errored') { | |
ReleaseReadableStreamReader(this); | |
} | |
} | |
get closed() { | |
if (IsReadableStreamReader(this) === false) { | |
return Promise_reject( | |
new TypeError('ReadableStreamReader.prototype.closed can only be used on a ReadableStreamReader')); | |
} | |
return this[readableStreamReaderClosedPromise]; | |
} | |
cancel(reason) { | |
if (IsReadableStreamReader(this) === false) { | |
return Promise_reject( | |
new TypeError('ReadableStreamReader.prototype.cancel can only be used on a ReadableStreamReader')); | |
} | |
const state = this[readableStreamReaderState]; | |
if (state === 'closed') { | |
return Promise_resolve(undefined); | |
} | |
if (state === 'errored') { | |
return Promise_reject(this[readableStreamReaderStoredError]); | |
} | |
return CancelReadableStream(this[readableStreamReaderOwnerReadableStream], reason); | |
} | |
read() { | |
if (IsReadableStreamReader(this) === false) { | |
return Promise_reject( | |
new TypeError('ReadableStreamReader.prototype.read can only be used on a ReadableStreamReader')); | |
} | |
return ReadFromReadableStreamReader(this); | |
} | |
releaseLock() { | |
if (IsReadableStreamReader(this) === false) { | |
throw new TypeError( | |
'ReadableStreamReader.prototype.releaseLock can only be used on a ReadableStreamReader'); | |
} | |
if (this[readableStreamReaderOwnerReadableStream] === undefined) { | |
return undefined; | |
} | |
// TODO(domenic): is getting array lengths safe? I don't think so. | |
// Might become moot if we have a better data structure. | |
if (this[readableStreamReaderReadRequests].length > 0) { | |
throw new TypeError( | |
'Tried to release a reader lock when that reader has pending read() calls un-settled'); | |
} | |
return ReleaseReadableStreamReader(this); | |
} | |
} | |
// | |
// Readable stream abstract operations | |
// | |
function AcquireReadableStreamReader(stream) { | |
return new ReadableStreamReader(stream); | |
} | |
function CancelReadableStream(stream, reason) { | |
const state = stream[readableStreamState]; | |
if (state === 'closed') { | |
return Promise_resolve(undefined); | |
} | |
if (state === 'errored') { | |
return Promise_reject(stream[readableStreamStoredError]); | |
} | |
stream[readableStreamQueue] = []; | |
FinishClosingReadableStream(stream); | |
const underlyingSource = stream[readableStreamUnderlyingSource]; | |
const sourceCancelPromise = PromiseInvokeOrNoop(underlyingSource, 'cancel', [reason]); | |
return thenPromise(sourceCancelPromise, function() { return undefined; }); | |
} | |
function CloseReadableStream(stream) { | |
if (stream[readableStreamState] === 'closed') { | |
return undefined; | |
} | |
stream[readableStreamCloseRequested] = true; | |
if (stream[readableStreamQueue].length === 0) { | |
return FinishClosingReadableStream(stream); | |
} | |
} | |
function EnqueueInReadableStream(stream, chunk) { | |
if (stream[readableStreamState] === 'closed') { | |
return undefined; | |
} | |
if (IsReadableStreamLocked(stream) === true && | |
stream[readableStreamReader][readableStreamReaderReadRequests].length > 0) { | |
const readRequest = shiftArray(stream[readableStreamReader][readableStreamReaderReadRequests]); | |
readRequest[readRequestResolve](CreateIterResultObject(chunk, false)); | |
} else { | |
let chunkSize = 1; | |
const strategySize = stream[readableStreamStrategySize]; | |
if (strategySize !== undefined) { | |
try { | |
chunkSize = strategySize(chunk); | |
} catch (chunkSizeE) { | |
ErrorReadableStream(stream, chunkSizeE); | |
throw chunkSizeE; | |
} | |
} | |
try { | |
EnqueueValueWithSize(stream[readableStreamQueue], chunk, chunkSize); | |
} catch (enqueueE) { | |
ErrorReadableStream(stream, enqueueE); | |
throw enqueueE; | |
} | |
} | |
RequestReadableStreamPull(stream); | |
} | |
function ErrorReadableStream(stream, e) { | |
stream[readableStreamQueue] = []; | |
stream[readableStreamStoredError] = e; | |
stream[readableStreamState] = 'errored'; | |
if (IsReadableStreamLocked(stream) === true) { | |
return ReleaseReadableStreamReader(stream[readableStreamReader]); | |
} | |
} | |
function FinishClosingReadableStream(stream) { | |
stream[readableStreamState] = 'closed'; | |
if (IsReadableStreamLocked(stream) === true) { | |
return ReleaseReadableStreamReader(stream[readableStreamReader]); | |
} | |
} | |
function GetReadableStreamDesiredSize(stream) { | |
const queueSize = GetTotalQueueSize(stream[readableStreamQueue]); | |
return stream[readableStreamStrategyHWM] - queueSize; | |
} | |
function IsReadableStream(x) { | |
// TODO(domenic): is it safe to allow this to be called on non-objects? | |
return %HasOwnProperty(x, readableStreamUnderlyingSource); | |
} | |
function IsReadableStreamLocked(stream) { | |
return stream[readableStreamReader] !== undefined; | |
} | |
function IsReadableStreamController(x) { | |
return %HasOwnProperty(x, readableStreamControllerControlledReadableStream); | |
} | |
function IsReadableStreamReader(x) { | |
return %HasOwnProperty(x, readableStreamReaderOwnerReadableStream); | |
} | |
function ReadFromReadableStreamReader(reader) { | |
const state = reader[readableStreamReaderState]; | |
if (state === 'closed') { | |
return Promise_resolve(CreateIterResultObject(undefined, true)); | |
} | |
if (state === 'errored') { | |
return Promise_reject(reader[readableStreamReaderStoredError]); | |
} | |
const ownerReadableStream = reader[readableStreamReaderOwnerReadableStream]; | |
const queue = ownerReadableStream[readableStreamQueue]; | |
if (queue.length > 0) { | |
const chunk = DequeueValue(queue); | |
if (ownerReadableStream[readableStreamCloseRequested] === true && queue.length === 0) { | |
FinishClosingReadableStream(ownerReadableStream); | |
} else { | |
RequestReadableStreamPull(ownerReadableStream); | |
} | |
return Promise_resolve(CreateIterResultObject(chunk, false)); | |
} else { | |
const readRequest = {}; | |
readRequest[readRequestPromise] = new Promise(function(resolve, reject) { | |
readRequest[readRequestResolve] = resolve; | |
readRequest[readRequestReject] = reject; | |
}); | |
pushArray(reader[readableStreamReaderReadRequests], readRequest); | |
RequestReadableStreamPull(ownerReadableStream); | |
return readRequest[readRequestPromise]; | |
} | |
} | |
function ReleaseReadableStreamReader(reader) { | |
const ownerReadableStream = reader[readableStreamReaderOwnerReadableStream]; | |
if (ownerReadableStream[readableStreamState] === 'errored') { | |
reader[readableStreamReaderState] = 'errored'; | |
const e = ownerReadableStream[readableStreamStoredError]; | |
reader[readableStreamReaderStoredError] = e; | |
reader[readableStreamReaderClosedPromise_reject](e); | |
for (const readRequest of reader[readableStreamReaderReadRequests]) { | |
readRequest[readRequestReject](e); | |
} | |
} else { | |
reader[readableStreamReaderState] = 'closed'; | |
reader[readableStreamReaderClosedPromise_resolve](undefined); | |
for (const readRequest of reader[readableStreamReaderReadRequests]) { | |
readRequest[readRequestResolve](CreateIterResultObject(undefined, true)); | |
} | |
} | |
reader[readableStreamReaderReadRequests] = []; | |
ownerReadableStream[readableStreamReader] = undefined; | |
reader[readableStreamReaderOwnerReadableStream] = undefined; | |
} | |
function RequestReadableStreamPull(stream) { | |
const shouldPull = ShouldReadableStreamPull(stream); | |
if (shouldPull === false) { | |
return undefined; | |
} | |
if (stream[readableStreamPulling] === true) { | |
stream[readableStreamPullAgain] = true; | |
return undefined; | |
} | |
stream[readableStreamPulling] = true; | |
const underlyingSource = stream[readableStreamUnderlyingSource]; | |
const controller = stream[readableStreamController]; | |
const pullPromise = PromiseInvokeOrNoop(underlyingSource, 'pull', [controller]); | |
thenPromise(pullPromise, | |
function() { | |
stream[readableStreamPulling] = false; | |
if (stream[readableStreamPullAgain] === true) { | |
stream[readableStreamPullAgain] = false; | |
return RequestReadableStreamPull(stream); | |
} | |
}, | |
function(e) { | |
if (stream[readableStreamState] === 'readable') { | |
return ErrorReadableStream(stream, e); | |
} | |
} | |
); | |
} | |
function ShouldReadableStreamPull(stream) { | |
const state = stream[readableStreamState]; | |
if (state === 'closed' || state === 'errored') { | |
return false; | |
} | |
if (stream[readableStreamCloseRequested] === true) { | |
return false; | |
} | |
if (stream[readableStreamStarted] === false) { | |
return false; | |
} | |
if (IsReadableStreamLocked(stream) === true) { | |
const reader = stream[readableStreamReader]; | |
const readRequests = reader[readableStreamReaderReadRequests]; | |
if (readRequests.length > 0) { | |
return true; | |
} | |
} | |
const desiredSize = GetReadableStreamDesiredSize(stream); | |
if (desiredSize > 0) { | |
return true; | |
} | |
return false; | |
} | |
// TODO TeeReadableStream | |
// | |
// Queue-with-sizes | |
// | |
// TODO(domenic): manipulating arrays seems fraught with peril in general; e.g. if someone defines getters/setters | |
// on the prototype chain, we can no longer shift and push. | |
function DequeueValue(queue) { | |
return shiftArray(queue).value; | |
} | |
function EnqueueValueWithSize(queue, value, size) { | |
size = Number(size); | |
if (Number_isNaN(size) || size === +Infinity || size === -Infinity) { | |
throw new RangeError('size must be a finite, non-NaN number.'); | |
} | |
// TODO(domenic): is adding numbers safe? Overridden valueOf could ruin our day. | |
queue[queueSize] += size; | |
pushArray(queue, { value, size }); | |
} | |
function GetTotalQueueSize(queue) { | |
return queue[queueSize]; | |
} | |
// | |
// Other helpers | |
// | |
function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) { | |
if (size !== undefined && typeof size !== 'function') { | |
throw new TypeError('size property of a queuing strategy must be a function'); | |
} | |
highWaterMark = Number(highWaterMark); | |
if (Number_isNaN(highWaterMark)) { | |
throw new TypeError('highWaterMark property of a queuing strategy must be convertible to a non-NaN number'); | |
} | |
if (highWaterMark < 0) { | |
throw new RangeError('highWaterMark property of a queuing strategy must be nonnegative'); | |
} | |
return { size, highWaterMark }; | |
} | |
function InvokeOrNoop(O, P, args) { | |
const method = O[P]; | |
if (method === undefined) { | |
return undefined; | |
} | |
return applyFunction(method, O, args); | |
} | |
function PromiseInvokeOrNoop(O, P, args) { | |
let method; | |
try { | |
method = O[P]; | |
} catch (methodE) { | |
return Promise_reject(methodE); | |
} | |
if (method === undefined) { | |
return Promise_resolve(undefined); | |
} | |
try { | |
return Promise_resolve(applyFunction(method, O, args)); | |
} catch (e) { | |
return Promise_reject(e); | |
} | |
} | |
function CreateIterResultObject(value, done) { | |
return { value, done }; | |
} | |
// | |
// Exports | |
// | |
Object.defineProperty(global, 'ReadableStream', { | |
enumerable: false, | |
writable: true, | |
configurable: true, | |
value: ReadableStream | |
}); | |
})(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment