{proc, chan, receive, send, mult} = require './'
{Generator, isGenerator, compareAndSwap} = require './helpers'
{inspect} = require 'util'
module.exports =
Non-coordinated asynchronous reference type.
class Agent
constructor: (@value, @validate) ->
@error = null
@reader = proc =>
loop then yield send @value
return
@writer = proc =>
loop
[action, args...] = yield receive()
p = @do action, args...
yield receive p
return
@updates = null
@notifier = null
Implement I/O for Agent
by delegating to properties that reference channels.
chan.io @::, 'writer', 'reader'
isEqual = (a,b) -> a is b or a?.equals? b
compareAndSwap: (oldValue, newValue) ->
if isEqual oldValue, @value
then @value = newValue; yes
else no
Adds or removes channels to which updates to agent state will be sent.
listen: (ch) -> (@notifier ?= mult @updates = chan()).add ch
ignore: (ch) -> @notifier?.remove ch
get: -> @value
Performs race-free mutation of agent state by applying action
, either immediately as a pure function or as a long-running process, to the current state and provided args
, then validating and attempting to compare-and-swap the yielded result
value.
Returns a process whose output channel will close with the accepted result
value of the applied action
.
TODO: (1) Manual translation of
do
into generator class; (2) Error handling on side-channel.
do: (action, args...) ->
try if typeof action isnt 'function'
console.log [action, args]
throw new TypeError "Invalid action"
catch e
console.log e
console.log e.stack
proc => loop
value = @value
result = action value, args...
if isGenerator result
result = yield receive proc result
#console.log {action, args, value, result}
if @validate?
unless @validate result
throw new Error "Invalid action"
if @compareAndSwap value, result
send.async @updates, [this, value, result] if @updates?
return result
else
yield proc
recover: (value, options) ->
throw new Error unless @error?
{assert} = require 'chai'
{proc, chan, send, receive, agent} = require 'prochan'
{async} = proc
{assoc, conj} = # to be replaced with (require 'mori') or similar
assoc: (map, pairs...) ->
result = {}
for key, value of map then result[key] = value
for key, i in pairs by 2 then result[key] = pairs[i+1]
result
conj: (coll, items...) -> coll.concat items
describe "Agents:", ->
describe "Data types:", ->
it "works with arrays", async ->
ref = agent [0,1,2]
result = yield receive ref.do conj, 3, 4, 5
assert.deepEqual result, [0,1,2,3,4,5]
describe "Actions:", ->
describe "as functions:", ->
it "sends action asynchronously and returns immediately", ->
ref = agent key: 'value'
send.async ref, [assoc, 'newKey', 'newValue']
assert.deepEqual ref.get(),
key: 'value'
it "sends, waits for agent to accept (not perform) action", async ->
ref = agent key: 'value'
yield send ref, [assoc, 'newKey', 'newValue']
assert.deepEqual ref.get(),
key: 'value'
it "waits for agent to perform action", async ->
ref = agent key: 'value'
result = yield receive ref.do assoc, 'newKey', 'newValue'
assert.deepEqual result,
key: 'value'
newKey: 'newValue'
describe "as processes:", ->