Skip to content

Instantly share code, notes, and snippets.

@daschl
Created July 21, 2012 08:39
Show Gist options
  • Save daschl/3155132 to your computer and use it in GitHub Desktop.
Save daschl/3155132 to your computer and use it in GitHub Desktop.
Simple Couchbase PHP 2PC Implementation
<?php
/**
* Simple two-phase commit for PHP couchbase.
*
* Michael Nitschinger (@daschl, 2012)
*
* Additional Remarks
* ------------------
* - The Couchbase extension makes it currently pretty hard to write easy readable code when dealing with
* CAS (compared to the ruby adapter). This could certainly be improved with closures. I also found that
* only the getDelayed method seems to return the CAS values (and the get callback doesnt seem to work
* either.
* - In the current implementation there should also be a check to see if a transaction is currently running
* on both source or destination (or some notes should be placed to do that inside userland). This is
* not so much a case in this transfer method but would be more an issue in a general transaction method
* where every field could be modified.
* - Also, since the SDK doesn't raise an exception when a CAS is not valid, you have to do that on your own
* always. This of course makes it more verbose too.
* - I also found that $cb->increment doesnt create the key if it isnt found (this should be corrected in the
* docs or just be implemented). Or maqybe with an optional 'create' => true as in the ruby sdk.
* - You can easily test this implementation by raising a TransactionException somewhere in the try block.
*/
class TransactionException extends RuntimeException {}
function transfer($source, $destination, $amount, &$cb) {
// helper closure to return the cas and other values more easily
$get = function($key, $casOnly = false) use (&$cb) {
$return = null;
$cb->getDelayed(array($key), true, function($cb, $data) use(&$return, $casOnly) {
$return = $casOnly ? $data['cas'] : array(json_decode($data['value'], true), $data['cas']);
});
return $return;
};
// prepare transaction document
if($cb->get('transaction:counter') === null) {
$cb->set('transaction:counter', 0);
}
$id = $cb->increment('transaction:counter', 1);
$state = 'initial';
$transKey = "transaction:$id";
$transDoc = compact('source', 'destination', 'amount', 'state');
$cb->set($transKey, json_encode($transDoc));
$transactionCas = $get($transKey, true);
// if the transaction document couldnt be stored, return an exception.
if(!$transactionCas) {
throw new TransactionException("Could not insert transaction document");
}
try {
// STEP 1: Switch transaction into pending state
$transDoc['state'] = 'pending';
if(!$cb->cas($transactionCas, $transKey, json_encode($transDoc))) {
throw new TransactionException("Could not switch to pending state");
}
// STEP 2: Apply transaction to both documents
list($sourceDoc, $sourceCas) = $get($source);
list($destDoc, $destCas) = $get($destination);
$sourceDoc['points'] -= $amount;
$sourceDoc['transactions'] += array($transKey);
$destDoc['points'] += $amount;
$destDoc['transactions'] += array($transKey);
if(!$cb->cas($sourceCas, $source, json_encode($sourceDoc))) {
throw new TransactionException("Could not update source document");
}
if(!$cb->cas($destCas, $destination, json_encode($destDoc))) {
throw new TransactionException("Could not update destination document");
}
// STEP 3: Switch transactions into commited state
$transDoc['state'] = 'committed';
$transactionCas = $get($transKey, true);
if(!$cb->cas($transactionCas, $transKey, json_encode($transDoc))) {
throw new TransactionException("Could not switch to committed state");
}
// STEP 4: Remove transaction from documents
list($sourceDoc, $sourceCas) = $get($source);
list($destDoc, $destCas) = $get($destination);
$sourceDoc['transactions'] = array_diff($sourceDoc['transactions'], array($transKey));
$destDoc['transactions'] = array_diff($destDoc['transactions'], array($transKey));
if(!$cb->cas($sourceCas, $source, json_encode($sourceDoc))) {
throw new TransactionException("Could not remove transaction from source document");
}
if(!$cb->cas($destCas, $destination, json_encode($destDoc))) {
throw new TransactionException("Could not remove transaction from destination document");
}
// STEP 5: Switch transaction into done state
$transDoc['state'] = 'done';
$transactionCas = $get($transKey, true);
if(!$cb->cas($transactionCas, $transKey, json_encode($transDoc))) {
throw new TransactionException("Could not switch to done state");
}
} catch(Exception $e) {
// Rollback transaction
list($transDoc, $transCas) = $get($transKey);
switch($transDoc['state']) {
case 'committed':
// create new transaction and swap the targets
transfer($destination, $source, $amount, $cb);
break;
case 'pending':
// STEP 1: switch transaction into cancelling state
$transDoc['state'] = 'cancelling';
$transactionCas = $get($transKey, true);
if(!$cb->cas($transactionCas, $transKey, json_encode($transDoc))) {
throw new TransactionException("Could not switch into cancelling state");
}
// STEP 2: revert changes if applied
list($sourceDoc, $sourceCas) = $get($source);
list($destDoc, $destCas) = $get($destination);
if(in_array($transKey, $sourceDoc['transactions'])) {
$sourceDoc['points'] += $amount;
$sourceDoc['transactions'] = array_diff($sourceDoc['transactions'], array($transKey));
if(!$cb->cas($sourceCas, $source, json_encode($sourceDoc))) {
throw new TransactionException("Could not revert source document");
}
}
if(in_array($transKey, $destDoc['transactions'])) {
$destDoc['points'] -= $amount;
$destDoc['transactions'] = array_diff($destDoc['transactions'], array($transKey));
if(!$cb->cas($destCas, $destination, json_encode($destDoc))) {
throw new TransactionException("Could not revert destination document");
}
}
// STEP 3: switch transaction into cancelled state
$transDoc['state'] = 'cancelled';
$transactionCas = $get($transKey, true);
if(!$cb->cas($transactionCas, $transKey, json_encode($transDoc))) {
throw new TransactionException("Could not switch into cancelled state");
}
break;
}
// Rethrow the original exception
throw new Exception("Transaction failed, rollback executed", null, $e);
}
}
$cb = new Couchbase('localhost:8091');
$cb->set('karen', json_encode(array(
'name' => 'karen',
'points' => 500,
'transactions' => array()
)));
$cb->set('dipti', json_encode(array(
'name' => 'dipti',
'points' => 700,
'transactions' => array()
)));
transfer('karen', 'dipti', 100, $cb);
?>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment