Last active
July 24, 2017 10:04
-
-
Save matianfu/6d62a350f549bfeda1bbcba78384b5ee to your computer and use it in GitHub Desktop.
A base class for writing concurrent processes, with abort, until, race and settle.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// licensed under public domain | |
// author: [email protected] | |
const EventEmitter = require('events') | |
// K combinator, not necessary, just for fun | |
const K = x => y => x | |
// this class is mainly for settle logic. | |
// the concrete class should emit a 'finish' event with err/data at the end of the process | |
class Concurrent extends EventEmitter { | |
constructor () { | |
super() | |
this._untils = [] | |
this.observe('error', null) | |
} | |
_until () { | |
this._untils = this.error | |
? this._untils.reduce((arr, x) => K(arr)(x.reject()), []) | |
: this._untils.reduce((arr, x) => x.predicate() ? K(arr)(x.resolve()) : [...arr, x], []) | |
} | |
async race (promise) { | |
let finished = false | |
const f = async () => { | |
try { | |
return await promise | |
} finally { | |
if (!this.error) { | |
finished = true | |
this._until() | |
} | |
} | |
} | |
return (await Promise.all([f, this.until(() => finished)]))[0] | |
} | |
async settle (promise) { | |
let x = await promise | |
if (this.error) throw this.error | |
return x | |
} | |
async untilAsync (predicate) { | |
return predicate() || new Promise((resolve, reject) => this._untils.push({ predicate, resolve, reject })) | |
} | |
observe (name, value) { | |
let _name = '_' + name | |
this[_name] = value | |
Object.defineProperty(this, name, { | |
get: function () { | |
return this[_name] | |
}, | |
set: function (x) { | |
console.log('observe set', name, this[_name], x) | |
this[_name] = x | |
this._until() | |
} | |
}) | |
} | |
abort () { | |
this.error = new Error('aborted') | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
测试回帖含代码: