Created
April 20, 2012 06:02
-
-
Save groundwater/2426469 to your computer and use it in GitHub Desktop.
Coprocessor Transactions in HBase and Scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** This builds a transactional layer into HBase via coprocessors. | |
* | |
* Transactions can be performed by sending a group of mutations, | |
* to a coprocessor. The coprocessor modifies the table using a | |
* special mutation protocol that ensures anyone using the same | |
* protocol will only view atomic commits. | |
* | |
* Querying the table directly does not guaruntee atomicity. You | |
* _must_ query using the provided protocol. | |
* | |
* The protocol guaruntees that, should a crash occur, partial | |
* transactions can be recovered. | |
* | |
* The goal of this protocol is _not_ performance. It is an extra | |
* layer of guaruntees should they be required. | |
* | |
* ## Mutation Protocol ## | |
* | |
* The mutation protocol is as follows: | |
* | |
* 1. The entire transaction is pre-written | |
* | |
* The transaction object is encoded as a byte stream and recorded | |
* in a special row `transaction.<timestamp>.uid`. Column `buffer` | |
* encodes the transaction object. The column `status` encodes the | |
* global status of the transaction. | |
* | |
* Set status to `0-WRITTEN` | |
* | |
* 2. Lock all keys listed by mutations or assertions | |
* | |
* Keys are locked by a checkAndPut to a special lock column. | |
* Given any key `x` the lock column is `x_lock`. | |
* Any value means the column is locked. The tag value should be | |
* the uid of the transactions row. Transactions must respect existing | |
* locks and either wait or abort in such a case. | |
* | |
* Set the global status to `1-LOCKED` | |
* | |
* 3. Run all assertions | |
* | |
* Remember that any asserted cells must be locked in the previous | |
* stage. Assertions that fail must abort the transaction. | |
* | |
* Set the global status to `2-ASSERTED` | |
* | |
* 4. Mutate values | |
* | |
* By now, if the transaction should halt due to a crash, recovery | |
* software should complete the transaction during recovery. | |
* | |
* Since HBase never over-writes old values, data is never really lost | |
* except during a compactions. Just for safety, tag the old version | |
* by putting its timestamp into `x_tag`. | |
* | |
* Put the new value. | |
* | |
* Once all values are inserted, set the global status to `3-PUT` | |
* | |
* 5. Cleanup | |
* | |
* The transaction is done, but the safety measures must be cleaned | |
* up gracefully. | |
* | |
* Clear all `x_tag` values then set the global status to `4-CLEANING`. | |
* | |
* 6. Unlock | |
* | |
* Unlock all the cells. An unloced cell sboudl be immediately available | |
* for another transaction, even before the rest of the cells are unlocked. | |
* | |
* After all tags are unlocked, set the global status to `5-COMPLETE` | |
* | |
* Cleanup of transaction rows can be done at the discression of the administrators. | |
* | |
* ## Discussion ## | |
* | |
* I'm not sure if the `x_tag` is necessary. Certainly HBase makes it easier | |
* to recover from transactions by never deleting old data. | |
* | |
* I'd like to point out again that this is not intended to be a high-performance | |
* addition to HBase. Rather, should you build 90% of your application in HBase | |
* then decide a small piece needs some transactional security, this is an easy | |
* option to throw in the mix. Don't use it unless it's necessary. | |
* | |
* I think any good implementation will require a `fsck` that can be run | |
* post crash. That should be added into the coprocessor at some point. | |
* | |
*/ | |
package ca.underflow.hbase | |
import org.apache.hadoop.hbase.client._ | |
import org.apache.hadoop.hbase.coprocessor._ | |
import org.apache.hadoop.hbase.ipc._ | |
import org.apache.hadoop.hbase.util._ | |
case class Key( | |
row: Array[Byte], | |
fam: Array[Byte], | |
col: Array[Byte]) | |
object Key { | |
// Convenience Constructor | |
def apply(row: String, fam: String, col: String): Key = { | |
Key(Bytes.toBytes(row), | |
Bytes.toBytes(fam), | |
Bytes.toBytes(col)) | |
} | |
} | |
case class Value(bytes: Array[Byte]) | |
object Value { | |
// Convenience Constructor | |
def apply(value: String): Value = { | |
Value(Bytes.toBytes(value)) | |
} | |
} | |
case class Mutation(key: Key, value: Value) | |
// Assertions verify existing data pre-commit. | |
// Failed assertions *must* abort the transaction. | |
trait Assertion { | |
def assertOn(table: HTableInterface) | |
} | |
case class Exact(key: Key, value: Value) extends Assertion | |
case class Some(key: Key) extends Assertion | |
case class GreaterThan(key: Key, value: Value) extends Assertion | |
case class Not(assertion: Assertion) extends Assertion | |
case class And(left: Assertion, right: Assertion) extends Assertion | |
case class Or(left: Assertion, right: Assertion) extends Assertion | |
// RPC Interface to Coprocessor | |
// Keep it simple :) | |
trait Transactional extends CoprocessorProtocol { | |
implicit def assertions: List[Assertion] | |
// The coprocessor commits a set of mutations atomically | |
def commit(mutations: Iterable[Mutation]) | |
def commit(mutations: Iterable[Mutation], assertions: Iterable[Assertion]) | |
} | |
// Basic Coprocessor | |
class TransactionalBasic | |
extends BaseEndpointCoprocessor | |
with CoprocessorProtocol { | |
def commit(mutations: Iterable[Mutation]) {} | |
def commit(mutations: Iterable[Mutation], assertions: Iterable[Assertion]) {} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment