Created
July 21, 2012 08:39
-
-
Save daschl/3155132 to your computer and use it in GitHub Desktop.
Simple Couchbase PHP 2PC Implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
/** | |
* Simple two-phase commit for PHP couchbase. | |
* | |
* Michael Nitschinger (@daschl, 2012) | |
* | |
* Additional Remarks | |
* ------------------ | |
* - The Couchbase extension makes it currently pretty hard to write easy readable code when dealing with | |
* CAS (compared to the ruby adapter). This could certainly be improved with closures. I also found that | |
* only the getDelayed method seems to return the CAS values (and the get callback doesnt seem to work | |
* either. | |
* - In the current implementation there should also be a check to see if a transaction is currently running | |
* on both source or destination (or some notes should be placed to do that inside userland). This is | |
* not so much a case in this transfer method but would be more an issue in a general transaction method | |
* where every field could be modified. | |
* - Also, since the SDK doesn't raise an exception when a CAS is not valid, you have to do that on your own | |
* always. This of course makes it more verbose too. | |
* - I also found that $cb->increment doesnt create the key if it isnt found (this should be corrected in the | |
* docs or just be implemented). Or maqybe with an optional 'create' => true as in the ruby sdk. | |
* - You can easily test this implementation by raising a TransactionException somewhere in the try block. | |
*/ | |
class TransactionException extends RuntimeException {} | |
function transfer($source, $destination, $amount, &$cb) { | |
// helper closure to return the cas and other values more easily | |
$get = function($key, $casOnly = false) use (&$cb) { | |
$return = null; | |
$cb->getDelayed(array($key), true, function($cb, $data) use(&$return, $casOnly) { | |
$return = $casOnly ? $data['cas'] : array(json_decode($data['value'], true), $data['cas']); | |
}); | |
return $return; | |
}; | |
// prepare transaction document | |
if($cb->get('transaction:counter') === null) { | |
$cb->set('transaction:counter', 0); | |
} | |
$id = $cb->increment('transaction:counter', 1); | |
$state = 'initial'; | |
$transKey = "transaction:$id"; | |
$transDoc = compact('source', 'destination', 'amount', 'state'); | |
$cb->set($transKey, json_encode($transDoc)); | |
$transactionCas = $get($transKey, true); | |
// if the transaction document couldnt be stored, return an exception. | |
if(!$transactionCas) { | |
throw new TransactionException("Could not insert transaction document"); | |
} | |
try { | |
// STEP 1: Switch transaction into pending state | |
$transDoc['state'] = 'pending'; | |
if(!$cb->cas($transactionCas, $transKey, json_encode($transDoc))) { | |
throw new TransactionException("Could not switch to pending state"); | |
} | |
// STEP 2: Apply transaction to both documents | |
list($sourceDoc, $sourceCas) = $get($source); | |
list($destDoc, $destCas) = $get($destination); | |
$sourceDoc['points'] -= $amount; | |
$sourceDoc['transactions'] += array($transKey); | |
$destDoc['points'] += $amount; | |
$destDoc['transactions'] += array($transKey); | |
if(!$cb->cas($sourceCas, $source, json_encode($sourceDoc))) { | |
throw new TransactionException("Could not update source document"); | |
} | |
if(!$cb->cas($destCas, $destination, json_encode($destDoc))) { | |
throw new TransactionException("Could not update destination document"); | |
} | |
// STEP 3: Switch transactions into commited state | |
$transDoc['state'] = 'committed'; | |
$transactionCas = $get($transKey, true); | |
if(!$cb->cas($transactionCas, $transKey, json_encode($transDoc))) { | |
throw new TransactionException("Could not switch to committed state"); | |
} | |
// STEP 4: Remove transaction from documents | |
list($sourceDoc, $sourceCas) = $get($source); | |
list($destDoc, $destCas) = $get($destination); | |
$sourceDoc['transactions'] = array_diff($sourceDoc['transactions'], array($transKey)); | |
$destDoc['transactions'] = array_diff($destDoc['transactions'], array($transKey)); | |
if(!$cb->cas($sourceCas, $source, json_encode($sourceDoc))) { | |
throw new TransactionException("Could not remove transaction from source document"); | |
} | |
if(!$cb->cas($destCas, $destination, json_encode($destDoc))) { | |
throw new TransactionException("Could not remove transaction from destination document"); | |
} | |
// STEP 5: Switch transaction into done state | |
$transDoc['state'] = 'done'; | |
$transactionCas = $get($transKey, true); | |
if(!$cb->cas($transactionCas, $transKey, json_encode($transDoc))) { | |
throw new TransactionException("Could not switch to done state"); | |
} | |
} catch(Exception $e) { | |
// Rollback transaction | |
list($transDoc, $transCas) = $get($transKey); | |
switch($transDoc['state']) { | |
case 'committed': | |
// create new transaction and swap the targets | |
transfer($destination, $source, $amount, $cb); | |
break; | |
case 'pending': | |
// STEP 1: switch transaction into cancelling state | |
$transDoc['state'] = 'cancelling'; | |
$transactionCas = $get($transKey, true); | |
if(!$cb->cas($transactionCas, $transKey, json_encode($transDoc))) { | |
throw new TransactionException("Could not switch into cancelling state"); | |
} | |
// STEP 2: revert changes if applied | |
list($sourceDoc, $sourceCas) = $get($source); | |
list($destDoc, $destCas) = $get($destination); | |
if(in_array($transKey, $sourceDoc['transactions'])) { | |
$sourceDoc['points'] += $amount; | |
$sourceDoc['transactions'] = array_diff($sourceDoc['transactions'], array($transKey)); | |
if(!$cb->cas($sourceCas, $source, json_encode($sourceDoc))) { | |
throw new TransactionException("Could not revert source document"); | |
} | |
} | |
if(in_array($transKey, $destDoc['transactions'])) { | |
$destDoc['points'] -= $amount; | |
$destDoc['transactions'] = array_diff($destDoc['transactions'], array($transKey)); | |
if(!$cb->cas($destCas, $destination, json_encode($destDoc))) { | |
throw new TransactionException("Could not revert destination document"); | |
} | |
} | |
// STEP 3: switch transaction into cancelled state | |
$transDoc['state'] = 'cancelled'; | |
$transactionCas = $get($transKey, true); | |
if(!$cb->cas($transactionCas, $transKey, json_encode($transDoc))) { | |
throw new TransactionException("Could not switch into cancelled state"); | |
} | |
break; | |
} | |
// Rethrow the original exception | |
throw new Exception("Transaction failed, rollback executed", null, $e); | |
} | |
} | |
$cb = new Couchbase('localhost:8091'); | |
$cb->set('karen', json_encode(array( | |
'name' => 'karen', | |
'points' => 500, | |
'transactions' => array() | |
))); | |
$cb->set('dipti', json_encode(array( | |
'name' => 'dipti', | |
'points' => 700, | |
'transactions' => array() | |
))); | |
transfer('karen', 'dipti', 100, $cb); | |
?> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment