Sink 对 Source 有两种请求:
read
: 向 Source 请求数据,通过read(null, currCb)
来请求。abort
: 通知 Source 终止当前的 stream,通过read(endOrError, currCb)
来请求。endOrError
的类型是true | Error
,表示是一次正常的终止还是异常的终止。
对于 read
请求,Sink 应该基于 上一次的 read
请求回调了,再发起新的 read
请求这样的规则。
对于 abort
请求,由于有时 Source 也不知道何时需要 abort
(出现了异常或者外部其他条件触发的),因此可能在上一次 read
没有回调就发生。
- 对于 Sink 的
read
请求,如果”重入“(上一次read
没有结束,新的read
来了),Source 完成实际资源的读取后,仅仅调用最后一个传入的currCb
来传递数据。- 其”法理“在于,仅当 Source 真的有了数据,才可能返回给 Sink。这中间无论 Sink "催促"(
read
)多少次,Source 都不可能有数据返回,唯一能做的就是把currCb
更新到prevCb
。毕竟是 Sink 没有遵守 “新的读取要在获得了上一次数据之后发起“ 这条规则。Source 为了避免相同的数据多次回调 Sink,因此必然要在回调prevCb
还是回调currCb
之间选择一个,显然回调currCb
更合理。 - 如果我们把
read
理解成是 Sink 向 Source 传递了一个装数据的”杯子“,那么在”杯子“被装上新的数据返回给 Sink 之前,Sink 重复发read
就是不断地给Source
”新杯子“,Source 只能丢弃老”杯子“来确保数据只能返回一次。
- 其”法理“在于,仅当 Source 真的有了数据,才可能返回给 Sink。这中间无论 Sink "催促"(
- 当 Sink 请求 Source "终止(abort)"时 (
read(true)
),此时 Source 的处理逻辑是不一样的。首先 Source 会马上终止上一次未完成的prevCb(true)
,然后进行必要的清理工作,清理完成后currCb(true)
,针对当前的请求也进行了回调。- 这样处理的”法理“在于,Sink 发起的终止 Source 要尽快执行,而且要尽快回调 Sink 其完成了”终止“任务。并且,如果有
prevCb
存在,这个prevCb
不能再正常返回数据了,一定要返回终止(prevCb (true)
),因为此时 Sink 已经处于”结束“状态,回调正常的数据反而“不合理”。与此同时,上一次的read
相关联的异步调用在调用完成后,也不应该再回调 Sink了(pervCb(null, data)
),因为这个prevCb
已经可能随着最近的abort
请求被提前回调了(prevCb(true)
)。 - 和
read
的实现不一样(prevCb
总是被currCb
覆盖),abort
在实现的时候对于prevCb
和currCb
都会进行回调(prevCb (true)
和currCb (true)
)。之所以逻辑不一样,我”猜想“逻辑在于:abort
在 Sink 这边可能是一个突发需求,因为异常或者什么外部条件触发的,发生的时机和上一次read
是否回调完成无关。因此 Source 不能把abort
理解成是一次”杯子“的替代。Source 需要终止往最后一次的”杯子“里放数据的行为(prevCb (true)
),也需要响应最近的这次abort
的回调。
- 这样处理的”法理“在于,Sink 发起的终止 Source 要尽快执行,而且要尽快回调 Sink 其完成了”终止“任务。并且,如果有
如上文,Source 的实现至少需要跟踪以下几个状态:
prevCb
: 记录 Sink 最近的read
请求的回调函数。- 每次 Sink 发起
read(null, currCb)
请求,Source 会设置prevCb = currCb;
,换个新”杯子“。 - 当 Source 准备好数据,就会执行
const tempCb = prevCb; prevCb = null; tempCb(null, data);
来返回数据并清除prevCb
。
- 每次 Sink 发起
ended
: Source 用来记录当前 stream 是否已经终止。- 当 Sink 发起
abort
请求时,Source 往往立刻设置ended
为 Sink 传入的终止请求,例如:
- 当 Sink 发起
let prevCb = null;
let ended = null;
function read(endOrError, currCb) {
if (!currCb) throw new Error("read must have cb");
// 下游是否需要停止
if (endOrError) {
// 如需要,记录下游需求
ended = endOrError;
if (prevCb) {
callback(endOrError)
}
// 完成清理工作,清理后完成 abort 请求的回调
cleanUp(function(){
currCb(endOrError)
})
}
prevCb = currCb
readNext()
}
function readNext() {
if (!prevCb) return;
// 有之前的终止请求
if (ended) {
callback(ended);
return;
}
fs.read(FILENAME, (err, data) {
if (ended) {
callback(ended);
return;
}
if (err) {
ended = err;
callback(ended);
} else {
callback(null, data);
}
})
}
function callback(endOrError, data) {
const tempCb = prevCb
// 还债且清除欠债
prevCb = null
tempCb(endOrError, data)
}