Last active
May 18, 2017 10:47
-
-
Save metasansana/8f206b44923360d0faff2f56fe3f8d9e to your computer and use it in GitHub Desktop.
Attempt at actor model in typescript.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { match } from 'afpl/lib/control/Match'; | |
import { Suspend, Return, liftF } from 'afpl/lib/monad/Free'; | |
import { Maybe, fromAny } from 'afpl/lib/monad/Maybe'; | |
import { IO, safeIO, wrapIO } from 'afpl/lib/monad/IO'; | |
import { Free } from 'afpl/lib/monad/Free'; | |
import { Functor } from 'afpl/lib/data/Functor'; | |
import { identity, compose } from 'afpl/lib/util'; | |
import { SharedBuffer } from 'afpl/lib/async/SharedBuffer'; | |
/** | |
* DuplicateActorPathError | |
*/ | |
export function DuplicateActorPathError(path) { | |
this.message = `The path '${path}' is already in use!`; | |
this.path = path; | |
this.stack = (new Error(this.message)).stack; | |
this.name = this.constructor.name; | |
if (Error.hasOwnProperty('captureStackTrace')) | |
Error.captureStackTrace(this, this.constructor); | |
} | |
DuplicateActorPathError.prototype = Object.create(Error.prototype); | |
DuplicateActorPathError.prototype.constructor = DuplicateActorPathError; | |
export class DroppedMessage { | |
message: string; | |
to: string; | |
from: string; | |
constructor(message, to, from) { | |
this.message = message; | |
this.to = to; | |
this.from = from; | |
} | |
} | |
export class StoredMessage<M> { | |
constructor(public path: string, public message: M) { } | |
} | |
export interface Behaviour { <M, A>(m: M): Instruction<A> } | |
export interface FinalBehaviour { <M>(m: M): void } | |
/* actors */ | |
/** | |
* Template | |
*/ | |
export class Template { | |
constructor( | |
public id: string, | |
public start: Behaviour = noop) { } | |
}; | |
/** | |
* LocalTConfig is the config info a LocalT template expects. | |
*/ | |
export interface LocalTConfig { id: string; start?: Behaviour } | |
/** | |
* LocalT is a template for creating a local actor | |
* @property {string} id | |
* @property {function} start | |
*/ | |
export class LocalT extends Template { | |
constructor({ id, start }: LocalTConfig) { | |
super(id, start); | |
} | |
} | |
/** | |
* Actor | |
*/ | |
export class Actor { | |
constructor( | |
public id: string, | |
public path: string, | |
public behaviour: Behaviour, | |
public mailbox = new SharedBuffer<any>()) { } | |
} | |
/** | |
* ActorL | |
*/ | |
export class ActorL extends Actor { } | |
/** | |
* ActorWT | |
*/ | |
export class ActorWT<N> extends Actor { | |
constructor( | |
public askee: string, | |
public actor: Actor, | |
public next: Getter<N>) { | |
super('wt', askee, noop); | |
} | |
} | |
/** | |
* ActorDOA | |
*/ | |
export class ActorDOA extends Actor { } | |
/* system */ | |
export interface SystemConfig { start: Behaviour; log?: Logger } | |
export interface Logger { <A>(m: A): IO<void> }; | |
export interface TypedMap<T> { [key: string]: T; } | |
/** | |
* Signal | |
*/ | |
export class Signal { } | |
/** | |
* Started is sent to an actor to signal it has been started. | |
*/ | |
export class Started extends Signal { } | |
/** | |
* System acts as the root actor for any process. | |
* | |
* All actors are stored here except for ones that are children in seperate | |
* process. In those cases, communication is passed through the local parent | |
* reference. | |
*/ | |
export class System extends Actor { | |
actors: TypedMap<Actor>; | |
log: Logger; | |
constructor({ start, log = m => safeIO(() => console.log(m)) }: SystemConfig) { | |
super('', '', start); | |
this.actors = { '?': new ActorDOA('?', '?', noop) }; | |
this.log = log; | |
} | |
/** | |
* start the system | |
*/ | |
start(): IO<System> { | |
return evalAxiomChain(this.behaviour(new Started()), this, this); | |
} | |
} | |
/** | |
* system create a new actor system filling in the defaults if not provided. | |
* @summary SystemConf → System | |
*/ | |
export const system = (config: SystemConfig) => new System(config); | |
/* Axioms */ | |
export interface Getter<B> { | |
(a: any): B; | |
} | |
/** | |
* Axiom represents a member of the userland DSL. | |
* | |
* Typically corresponds to one of the actor model axioms. | |
* @abstract | |
*/ | |
export class Axiom<N> implements Functor<N> { | |
constructor(public next?: N | Getter<N>) { } | |
/** | |
* map | |
*/ | |
map<A, B>(f: (a: A) => B): Axiom<B> { | |
return match(this) | |
.caseOf(Receive, identity) | |
.caseOf(Raise, identity) | |
.caseOf(Spawn, ({ template, next }) => new Spawn(template, f(next))) | |
.caseOf(Task, ({ to, forkable, next }) => new Task(forkable, to, f(next))) | |
.caseOf(Tell, ({ to, message, next }) => new Tell(to, message, f(next))) | |
.caseOf(Ask, ({ askee, message, next }) => new Ask(askee, message, compose(f, next))) | |
.caseOf(Effect, ({ runnable, next }) => new Effect(runnable, compose(f, next))) | |
.caseOf(Stream, ({ to, source, next }) => new Stream(to, source, f(next))) | |
.caseOf(CPS, identity) | |
.caseOf(Noop, identity) | |
.end(); | |
} | |
} | |
/** | |
* Spawn | |
*/ | |
export class Spawn<N> extends Axiom<N> { | |
constructor(public template: Template, public next?: N) { | |
super(next); | |
} | |
} | |
/** | |
* Task | |
*/ | |
export class Task<N> extends Axiom<N> { | |
constructor(public forkable: Future, public to: string, public next?: N) { | |
super(next); | |
} | |
} | |
/** | |
* Tell | |
*/ | |
export class Tell<N> extends Axiom<N> { | |
constructor(public to: string, public message: string, public next?: N) { | |
super(next); | |
} | |
} | |
/** | |
* Ask | |
*/ | |
export class Ask<N> extends Axiom<N> { | |
constructor(public askee: string, public message: string, public next: Getter<N> = identity) { | |
super(next); | |
} | |
} | |
/** | |
* Effect | |
*/ | |
export class Effect<R, N> extends Axiom<N> { | |
constructor(public runnable: IO<R>, public next: Getter<N> = identity) { | |
super(next); | |
} | |
} | |
export interface StreamFunction<P> { | |
(f: (p: P) => System): void; | |
} | |
/** | |
* Stream | |
*/ | |
export class Stream<P, N> extends Axiom<N> { | |
constructor(public to: string, public source: StreamFunction<P>, public next?: N) { | |
super(next); | |
} | |
} | |
/** | |
* Receive | |
*/ | |
export class Receive<N> extends Axiom<N> { | |
constructor(public behaviour: Behaviour) { | |
super(); | |
} | |
} | |
export interface CPSFunction { | |
<A>(f: (i: Instruction<A>) => void): void; | |
} | |
/** | |
* CPS | |
*/ | |
export class CPS<N> extends Axiom<N> { | |
constructor(public cont: CPSFunction) { | |
super(); | |
} | |
} | |
/** | |
* Raise | |
*/ | |
export class Raise<N> extends Axiom<N> { | |
constructor(public error: Error) { super(); } | |
} | |
/** | |
* Noop | |
*/ | |
export class Noop<N> extends Axiom<N> { } | |
export type Instruction<A> = Free<Axiom<any>, A>; | |
/** | |
* runAxiomChain | |
*/ | |
export const runAxiomChain = <A>(i: Instruction<A>, a: Actor, s: System): System => | |
evalAxiomChain(i, a, s).run(); | |
/** | |
* evalAxiomChain translates a chain of axioms into the actual | |
* work to be done by the system. | |
*/ | |
export const evalAxiomChain = <A>(ch: Instruction<A>, a: Actor, s: System): IO<System> => | |
match(ch) | |
.caseOf(Suspend, ({ f }) => match(f) | |
.caseOf(Spawn, f => s.log(f).chain(() => evalSpawn(f, a, s))) | |
.caseOf(Task, f => s.log(f).chain(() => evalTask(f, a, s))) | |
.caseOf(Tell, f => s.log(f).chain(() => evalTell(f, a, s))) | |
.caseOf(Ask, f => s.log(f).chain(() => evalAsk(f, a, s))) | |
.caseOf(Effect, f => s.log(f).chain(() => evalEffect(f, a, s))) | |
.caseOf(Stream, f => s.log(f).chain(() => evalStream(f, a, s))) | |
.caseOf(Receive, f => s.log(f).chain(() => evalReceive(f, a, s))) | |
.caseOf(CPS, f => s.log(f).chain(() => evalCPS(f, a, s))) | |
.caseOf(Raise, ({ error }) => { throw error; }) | |
.caseOf(Noop, f => s.log(f).chain(() => wrapIO(s))) | |
.end()) | |
.caseOf(Return, () => wrapIO(s)) | |
.end(); | |
/** | |
* evalSpawn | |
*/ | |
export const evalSpawn = <A>({ template, next }: Spawn<Instruction<A>>, a: Actor, s: System): IO<System> => { | |
let p = makeChildPath(template.id, a.path); | |
return getActorMaybe(p, s) | |
.chain(mb => | |
mb.map(() => evalAxiomChain(raiseDup(p), a, s)) | |
.orJust(() => | |
allocateActor(p, template, s) | |
.chain(() => evalAxiomChain(next, a, s))) | |
.get()); | |
}; | |
/** | |
* evalTask | |
*/ | |
export const evalTask = <A>({ to, forkable, next }: Task<Instruction<A>>, a: Actor, s: System): IO<System> => | |
safeIO(() => { | |
forkable.fork( | |
e => evalAxiomChain(raise(e), a, s).run(), | |
m => evalAxiomChain(tell(to, m), a, s).run()) | |
return s; | |
}).chain(() => evalAxiomChain(next, a, s)) | |
/** | |
* evalTell | |
*/ | |
export const evalTell = <A>(op: Tell<Instruction<A>>, a: Actor, s: System): IO<System> => { | |
let { to, message, next } = op; | |
return pathToActor(to, a, s) | |
.chain(t => feedActor(message, t, a, s)) | |
.chain(() => evalAxiomChain(next, a, s)); | |
}; | |
/** | |
* evalAsk | |
*/ | |
export const evalAsk = <A>(op: Ask<Instruction<A>>, a: Actor, s: System): IO<System> => { | |
let { askee, message, next } = op; | |
return putActor(a.path, new ActorWT(askee, a, next), s) | |
.chain(s => pathToActor(askee, a, s)) | |
.chain(t => feedActor(message, t, a, s)) | |
.map(() => s); | |
}; | |
/** | |
* evalEffect | |
*/ | |
export const evalEffect = <R, A>(eff: Effect<R, Instruction<A>>, a: Actor, s: System): IO<System> => { | |
let { runnable, next } = eff; | |
return runnable.chain(r => evalAxiomChain(next(r), a, s)); | |
}; | |
/** | |
* evalStream | |
*/ | |
export const evalStream = <A, P>({ source, to, next }: Stream<P, Instruction<A>>, a: Actor, s: System): IO<System> => | |
safeIO(() => source(p => runAxiomChain(tell(to, p), a, s))) | |
.chain(() => evalAxiomChain(next, a, s)); | |
/** | |
* evalReceive | |
*/ | |
export const evalReceive = <A>({ behaviour }: Receive<Instruction<A>>, a: Actor, s: System): IO<System> => | |
takeMessage(a).chain(mb => consumeOrStore(mb, behaviour, a, s)); | |
/** | |
* evalCPS | |
*/ | |
export const evalCPS = <A>({ cont }: CPS<A>, a: Actor, s: System): IO<System> => | |
safeIO(() => cont(chain => runAxiomChain(chain, a, s))).map(() => s) | |
/** | |
* raiseDup | |
*/ | |
export const raiseDup = (p: string) => raise(new DuplicateActorPathError(p)); | |
/** | |
* makeChildPath creates a child path given an actor and a child id | |
*/ | |
export const makeChildPath = (id: string, parent: string): string => | |
((parent === '/') || (parent === '')) ? `${parent}${id}` : `${parent}/${id}`; | |
/** | |
* allocateActor creates and puts an actor into the system. | |
*/ | |
export const allocateActor = (p: string, t: Template, s: System): IO<System> => | |
createActor(p, t, s).chain(c => putActor(p, c, s).chain(s => evalNewActor(c, s))); | |
/** | |
* createActor | |
*/ | |
export const createActor = (path: string, t: Template, _s: System): IO<Actor> => match(t) | |
.caseOf(LocalT, t => wrapIO(new ActorL(t.id, path, t.start))) | |
.end(); | |
/** | |
* evalNewActor evals the instructions of a freshly spawned actor. | |
*/ | |
export const evalNewActor = (c: Actor, s: System) => | |
takeBehaviour(c).chain(mb => | |
mb.map(b => evalAxiomChain(b(new Started()), c, s)) | |
.orJust(() => wrapIO(s)) | |
.get()) | |
/** | |
* putActor into the System | |
*/ | |
export const putActor = (path: string, a: Actor, s: System): IO<System> => | |
safeIO(() => { s.actors[path] = a; return s; }); | |
/** | |
* pathToActor resolves an actor address from the system. | |
* If the actor is not found the '?' actor is returned. | |
* @param {string} p The path of the actor | |
* @param {Actor} a The actor making the query | |
* @param {System} s The System. | |
*/ | |
export const pathToActor = (p: string, a: Actor, s: System): IO<Actor> => | |
p === '.' ? | |
wrapIO(a) : getActorMaybe(p, s) | |
.map(mb => mb.orJust(() => s.actors['?']).get()); | |
/** | |
* getActorMaybe is like getActor but wraps the actor in a Maybe. | |
* @param p The path of the actor | |
* @param s The System. | |
*/ | |
export const getActorMaybe = (p: string, s: System): IO<Maybe<Actor>> => | |
getActor(p, s).map(fromAny); | |
/** | |
* getActor retrieves an actor from the system. | |
* @param p The path to the actor. | |
* @param s The System. | |
*/ | |
export const getActor = (p: string, s: System): IO<Actor> => safeIO(() => s.actors[p]) | |
/** | |
* feedActor feeds a message into an actor. | |
* The message may be processed immediately or stored for later. | |
*/ | |
export const feedActor = <A>(m: A, to: Actor, a: Actor, s: System): IO<Actor> => match(to) | |
.caseOf(ActorDOA, to => feedActorDOA(m, to, a, s)) | |
.caseOf(ActorL, to => feedActorL(m, to, a, s)) | |
.caseOf(ActorWT, to => feedActorWT(m, to, a, s)) | |
.end(); | |
/** | |
* feedActorDOA handles bounced messages | |
* @param {DroppedMessage} m | |
* @param {ActorDOA} a | |
*/ | |
export const feedActorDOA = <A>(m: A, to: Actor, a: ActorDOA, s: System): IO<Actor> => | |
s.log(new DroppedMessage(m, to, a.path)).mapIn(a); | |
/** | |
* feedActorL | |
*/ | |
export const feedActorL = <A>(m: A, a: ActorL, _from: Actor, s: System): IO<any> => | |
match(a.behaviour) | |
.caseOf(Function, b => delayIO(() => evalAxiomChain(b(m), a, s))) | |
.orElse(() => storeAuditedMessage(m, a, s)) | |
.end(); | |
/** | |
* feedActorWT | |
*/ | |
export const feedActorWT = <A>(m: A, a: ActorWT<any>, from: Actor, s: System): IO<any> => { | |
let { askee, actor, next } = a; | |
return (askee === from.path) ? | |
putActor(actor.path, actor, s) | |
.chain(s => delayIO(() => evalAxiomChain(next(m), actor, s))) : | |
storeAuditedMessage(m, actor, s); | |
}; | |
/** | |
* takeMessage takes the next message out of an actor's mailbox. | |
*/ | |
export const takeMessage = <A>(a: Actor): IO<Maybe<A>> => a.mailbox.maybeTake(); | |
/** | |
* storeAuditedMessage audits the message then stores it | |
* @param {*} m The message | |
* @param {Actor} a | |
* @param {System} s | |
*/ | |
export const storeAuditedMessage = <M>(m: M, a: Actor, s: System) => | |
s.log(new StoredMessage(a.path, m)).chain(() => storeMessage(m, a)); | |
/** | |
* storeMessage sends a message to an actor's mailbox | |
*/ | |
export const storeMessage = <M>(m: M, a: Actor): IO<null> => match(a) | |
.caseOf(ActorL, a => a.mailbox.put(m)) | |
.end(); | |
/** | |
* takeBehaviour gets the next behaviour of an actor. | |
* The behaviour is removed from the actor to prevent duplicate processing. | |
*/ | |
export const takeBehaviour = (a: Actor): IO<Maybe<Behaviour>> => | |
safeIO(() => { let b = a.behaviour; a.behaviour = null; return fromAny(b); }) | |
export const consumeOrStore = <A>(mb: Maybe<A>, b: Behaviour, a: Actor, s: System): IO<System> => | |
mb.map(m => feedActor(m, a, a, s)).orJust(() => putBehaviour(b, a).chainIn(s)).get() | |
/** | |
* putBehaviour changes the behaviour of an Actor. | |
* @param {Behaviour} b | |
* @param {Actor} a | |
*/ | |
export const putBehaviour = (b: Behaviour, a: Actor): IO<Actor> => | |
safeIO(() => { a.behaviour = b; return a; }) | |
/** | |
* delayIO delays the execution of an IO function | |
*/ | |
export const delayIO = <A>(f: () => IO<A>, n = 100): IO<void> => | |
safeIO(() => void setTimeout(() => f().run(), n)); | |
/** | |
* spawn a new child actor | |
*/ | |
export const spawn = (template: Template): Instruction<any> => liftF(new Spawn(template)); | |
/** | |
* tell another actor a message | |
*/ | |
export const tell = (to: string, message: any): Instruction<any> => | |
liftF(new Tell(to, message)); | |
export const ask = (askee: string, message: any): Instruction<any> => | |
liftF(new Ask(askee, message)); | |
/** | |
* task allows an asynchronous operation to be performed, placing its result in | |
* an actor's mailbox. | |
*/ | |
export const task = (f: Future, to: string = '.'): Instruction<any> => | |
liftF(new Task(f, to)); | |
/** | |
* effect allows a side-effectfull computation to occur. | |
*/ | |
export const effect = <R>(f: () => R): Instruction<any> => liftF(new Effect(safeIO(f))); | |
/** | |
* run an IO operation safely | |
*/ | |
export const run = <R>(io: IO<R>) => liftF(new Effect(io)); | |
/** | |
* stream input into an actor's mailbox | |
*/ | |
export const stream = <P>(source: StreamFunction<P>, to: string = '.'): Instruction<any> => | |
liftF(new Stream(to, source)); | |
/** | |
* receive the next message with the passed behaviour | |
*/ | |
export const receive = (behaviour: Behaviour): Instruction<any> => liftF(new Receive(behaviour)); | |
/** | |
* cps is helpfull when interacting with typical node callback based apis | |
* | |
* The evaluation of the instructions the actor wants executed is delayed | |
* until the passed callback is invoked. | |
* | |
*/ | |
export const cps = (f: CPSFunction): Instruction<any> => liftF(new CPS(f)); | |
/** | |
* finalReceive receives the next message and effectively puts the actor into | |
* an idle state. | |
*/ | |
export const finalReceive = <M>(b: (m: M) => void): Instruction<any> => | |
liftF(new Receive(m => b(m) || noop())); | |
/** | |
* raise an error within the system. | |
* This function gives the supervisor (if any) a chance to | |
* intercept and react to the error. It also terminates | |
* the current chain of instructions. | |
*/ | |
export const raise = (error: Error): Instruction<any> => liftF(new Raise(error)); | |
/** | |
* noop means do nothing, effectively putting the Actor in an idle mode forever. | |
*/ | |
export const noop = (): Instruction<any> => liftF(new Noop()); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment