-
-
Save nanne007/0e86c7fddae9cf9ebe2970f98ece831c to your computer and use it in GitHub Desktop.
A base class for writing concurrent processes, with abort, until, race and settle.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// licensed under public domain | |
// author: [email protected] | |
const EventEmitter = require('events') | |
// K combinator, not necessary, just for fun | |
const K = x => y => x | |
// this class is mainly for settle logic. | |
// the concrete class should emit a 'finish' event with err/data at the end of the process | |
class Concurrent extends EventEmitter { | |
constructor () { | |
super() | |
this._untils = [] | |
this.observe('error', null) | |
} | |
_until () { | |
this._untils = this.error | |
? this._untils.reduce((arr, x) => K(arr)(x.reject()), []) | |
: this._untils.reduce((arr, x) => x.predicate() ? K(arr)(x.resolve()) : [...arr, x], []) | |
} | |
async race (promise) { | |
let finished = false | |
const f = async () => { | |
try { | |
return await promise | |
} finally { | |
if (!this.error) { | |
finished = true | |
this._until() | |
} | |
} | |
} | |
return (await Promise.all([f, this.until(() => finished)]))[0] | |
} | |
async settle (promise) { | |
let x = await promise | |
if (this.error) throw this.error | |
return x | |
} | |
async untilAsync (predicate) { | |
return predicate() || new Promise((resolve, reject) => this._untils.push({ predicate, resolve, reject })) | |
} | |
observe (name, value) { | |
let _name = '_' + name | |
this[_name] = value | |
Object.defineProperty(this, name, { | |
get: function () { | |
return this[_name] | |
}, | |
set: function (x) { | |
console.log('observe set', name, this[_name], x) | |
this[_name] = x | |
this._until() | |
} | |
}) | |
} | |
abort () { | |
this.error = new Error('aborted') | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment