|
_ = require('underscore') |
|
|
|
class Network |
|
|
|
constructor: -> @nodes = {} |
|
|
|
add: (node) -> @nodes[node.pid] = node |
|
|
|
broadcast: (from, payload) -> |
|
for own pid, node of @nodes |
|
@unicast(from, pid, payload) unless from == pid |
|
|
|
unicast: (from, to, payload={}) -> |
|
@nodes[to].recv(from, payload) |
|
|
|
class Process extends require('events').EventEmitter |
|
|
|
constructor: (@G, @pid) -> G.add(@) |
|
|
|
send: (to, payload) -> |
|
@G.unicast(@pid, to, payload) |
|
|
|
broadcast: (payload) -> |
|
@G.broadcast(@pid, payload) |
|
|
|
recv: (from, payload) -> |
|
@emit(payload.mtype, from, payload) |
|
|
|
log: (args...) -> console.log "[#{@pid}] ", args... |
|
|
|
class AtomicCommitmentProcess extends Process |
|
|
|
constructor: -> |
|
super |
|
@on 'T_START', @startTransaction.bind(@) |
|
|
|
invoke: ({ trans, canCommit, parts }) -> |
|
for pid in parts |
|
@send(pid, { |
|
mtype: 'T_START', |
|
dc: 100 |
|
trans, canCommit, parts |
|
}) |
|
|
|
startTransaction: (from, { trans, dc, parts, canCommit }) -> |
|
|
|
coord = parts[parts.length - 1] |
|
committed = false |
|
|
|
@once 'VOTE_REQUEST', sendVote = => |
|
@log 'RECEIVED VOTE_REQUEST' |
|
@send(coord, { mtype: 'VOTE', vote: canCommit.call(@) }) |
|
|
|
@once 'COMMIT', commitTrans = => |
|
committed = true |
|
trans.call(@) |
|
|
|
@once 'ABORT', abortTrans = => |
|
@log 'ABORTING' |
|
|
|
if coord is @pid |
|
|
|
votes = {} |
|
|
|
@on 'VOTE', collectVotes = (from, { vote }) => |
|
votes[from] = vote |
|
@log "RECV VOTE [#{vote}] FROM [#{from}]" |
|
if _.all(parts.map (pid) -> votes[pid] is true) |
|
@log 'GOING TO COMMIT' |
|
sendAbortTimeout.close() |
|
@broadcast(mtype: 'COMMIT') |
|
|
|
sendAbortTimeout = setTimeout => |
|
@log 'GOING TO ABORT' |
|
@broadcast(mtype: 'ABORT') |
|
, 2 * dc |
|
|
|
@log('BROADCAST VOTE_REQUEST') |
|
@broadcast(mtype: 'VOTE_REQUEST') |
|
|
|
setTimeout => |
|
@removeListener('VOTE_REQUEST', sendVote) |
|
@removeListener('COMMIT', commitTrans) |
|
@removeListener('ABORT', commitTrans) |
|
@removeListener('VOTE', collectVotes) if coord is @pid |
|
, 4 * dc + 100 |
|
|
|
describe? 'Network G', -> |
|
|
|
G = null |
|
transactionLog = null |
|
trans = -> |
|
console.log "Running transaction on process #{@pid}" |
|
transactionLog[@pid] = true |
|
|
|
beforeEach -> |
|
G = new Network() |
|
transactionLog = {} |
|
|
|
describe 'P1..P3', -> |
|
|
|
beforeEach -> |
|
new AtomicCommitmentProcess(G, i) for i in [1..3] |
|
|
|
allHaveRun = -> |
|
_.all([1..3].map (pid) -> transactionLog[pid]) |
|
|
|
describe 'when P2 invokes and all processes can commit', -> |
|
|
|
it 'should run transaction on all processes', (done) -> |
|
P2 = G.nodes[2] |
|
P2.invoke { |
|
trans |
|
canCommit: -> true |
|
parts: [1..3] |
|
} |
|
setTimeout -> |
|
if allHaveRun() then done() |
|
, 1000 |
|
|
|
describe 'when P2 invokes and process 3 can\'t commit', -> |
|
|
|
it 'should not run transaction on any process', (done) -> |
|
P2 = G.nodes[2] |
|
P2.invoke { |
|
trans |
|
canCommit: -> @pid != 3 |
|
parts: [1..3] |
|
} |
|
setTimeout -> |
|
if not _.any(_.values(transactionLog)) then done() |
|
, 1000 |
|
|
|
|