Skip to content

Instantly share code, notes, and snippets.

@lawrencejones
Last active August 29, 2015 14:17
Show Gist options
  • Save lawrencejones/808fb0422dbe80f8d687 to your computer and use it in GitHub Desktop.
Save lawrencejones/808fb0422dbe80f8d687 to your computer and use it in GitHub Desktop.
Bare bones implementation of two phase commitment

Two Phase Commit

Using a hacky network abstraction, can simulate two phase commitment by having processes decide their ability to commit as a function of their process ID.

Should hold to atomic commitment axioms...

  1. All participants read the same decision
  2. If any participants decide to commit, then all do
  3. If all participants vote 'yes', then all will decide to commit
  4. Each participant decides no more than once

Example test run...

Example running mocha atomic.coffee

_ = require('underscore')
class Network
constructor: -> @nodes = {}
add: (node) -> @nodes[node.pid] = node
broadcast: (from, payload) ->
for own pid, node of @nodes
@unicast(from, pid, payload) unless from == pid
unicast: (from, to, payload={}) ->
@nodes[to].recv(from, payload)
class Process extends require('events').EventEmitter
constructor: (@G, @pid) -> G.add(@)
send: (to, payload) ->
@G.unicast(@pid, to, payload)
broadcast: (payload) ->
@G.broadcast(@pid, payload)
recv: (from, payload) ->
@emit(payload.mtype, from, payload)
log: (args...) -> console.log "[#{@pid}] ", args...
class AtomicCommitmentProcess extends Process
constructor: ->
super
@on 'T_START', @startTransaction.bind(@)
invoke: ({ trans, canCommit, parts }) ->
for pid in parts
@send(pid, {
mtype: 'T_START',
dc: 100
trans, canCommit, parts
})
startTransaction: (from, { trans, dc, parts, canCommit }) ->
coord = parts[parts.length - 1]
committed = false
@once 'VOTE_REQUEST', sendVote = =>
@log 'RECEIVED VOTE_REQUEST'
@send(coord, { mtype: 'VOTE', vote: canCommit.call(@) })
@once 'COMMIT', commitTrans = =>
committed = true
trans.call(@)
@once 'ABORT', abortTrans = =>
@log 'ABORTING'
if coord is @pid
votes = {}
@on 'VOTE', collectVotes = (from, { vote }) =>
votes[from] = vote
@log "RECV VOTE [#{vote}] FROM [#{from}]"
if _.all(parts.map (pid) -> votes[pid] is true)
@log 'GOING TO COMMIT'
sendAbortTimeout.close()
@broadcast(mtype: 'COMMIT')
sendAbortTimeout = setTimeout =>
@log 'GOING TO ABORT'
@broadcast(mtype: 'ABORT')
, 2 * dc
@log('BROADCAST VOTE_REQUEST')
@broadcast(mtype: 'VOTE_REQUEST')
setTimeout =>
@removeListener('VOTE_REQUEST', sendVote)
@removeListener('COMMIT', commitTrans)
@removeListener('ABORT', commitTrans)
@removeListener('VOTE', collectVotes) if coord is @pid
, 4 * dc + 100
describe? 'Network G', ->
G = null
transactionLog = null
trans = ->
console.log "Running transaction on process #{@pid}"
transactionLog[@pid] = true
beforeEach ->
G = new Network()
transactionLog = {}
describe 'P1..P3', ->
beforeEach ->
new AtomicCommitmentProcess(G, i) for i in [1..3]
allHaveRun = ->
_.all([1..3].map (pid) -> transactionLog[pid])
describe 'when P2 invokes and all processes can commit', ->
it 'should run transaction on all processes', (done) ->
P2 = G.nodes[2]
P2.invoke {
trans
canCommit: -> true
parts: [1..3]
}
setTimeout ->
if allHaveRun() then done()
, 1000
describe 'when P2 invokes and process 3 can\'t commit', ->
it 'should not run transaction on any process', (done) ->
P2 = G.nodes[2]
P2.invoke {
trans
canCommit: -> @pid != 3
parts: [1..3]
}
setTimeout ->
if not _.any(_.values(transactionLog)) then done()
, 1000
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment