Created
May 25, 2011 04:53
-
-
Save heuristicfencepost/990361 to your computer and use it in GitHub Desktop.
Experiments in interacting with Cassandra via Thrift
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(import '(org.apache.thrift.transport TFramedTransport TSocket) | |
'(org.apache.thrift.protocol TBinaryProtocol) | |
'(org.apache.cassandra.thrift Cassandra$Client SliceRange SlicePredicate ColumnParent KeyRange ConsistencyLevel) | |
) | |
(defn connect [host port keyspace] | |
"Connect to a Cassandra instance on the specified host and port. Set things up to use the specified key space." | |
(let [transport (TFramedTransport. (TSocket. host port)) | |
protocol (TBinaryProtocol. transport) | |
client (Cassandra$Client. protocol)] | |
(.open transport) | |
(.set_keyspace client keyspace) | |
client | |
) | |
) | |
(defn get_range_slices [client cf start end] | |
"Simple front end into the get_range_slices function exposed via Thrift" | |
(let [slice_range | |
(doto (SliceRange.) | |
(.setStart (byte-array 0)) | |
(.setFinish (byte-array 0)) | |
(.setReversed false) | |
(.setCount 100) | |
) | |
slice_predicate | |
(doto (SlicePredicate.) | |
(.setSlice_range slice_range)) | |
column_parent (ColumnParent. cf) | |
key_range | |
(doto (KeyRange.) | |
(.setStart_key (.getBytes start)) | |
(.setEnd_key (.getBytes end))) | |
] | |
(.get_range_slices client column_parent slice_predicate key_range ConsistencyLevel/ONE) | |
) | |
) | |
(defn range_slices_keys [slices] | |
"Retrieve the set of keys in a get_range_slices result" | |
(map #(String. (.getKey %)) slices) | |
) | |
(defn range_slices_columns [slices key] | |
"Retrieve a map of the columns associated with the specified key in a get_range_slices result" | |
(let [match (first (filter #(= key (String. (.getKey %))) slices))] | |
(cond (nil? match) nil | |
(true? true) | |
(let [urcols (.getColumns match) | |
cols (map #(.getColumn %) urcols)] | |
(zipmap (map #(keyword (String. (.getName %))) cols) | |
(map #(String. (.getValue %)) cols)) | |
) | |
) | |
) | |
) | |
(let [client (connect "localhost" 9160 "twitter") | |
key_slices (get_range_slices client "authors" "!" "~") | |
five_keys (take 5 (range_slices_keys key_slices))] | |
(print five_keys) | |
(let [formatfn | |
(fn [key] | |
(let [cols (range_slices_columns key_slices key)] | |
(format "Key %s: name => %s, following => %s\n" key (cols :name) (cols :following)) | |
) | |
)] | |
(print (reduce str (map formatfn five_keys))) | |
) | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.fencepost.cassandra | |
import scala.collection.JavaConversions | |
import org.apache.thrift.protocol.TBinaryProtocol | |
import org.apache.thrift.transport._ | |
import org.apache.cassandra.service._ | |
import org.apache.cassandra.thrift._ | |
object ThriftCassandraClient { | |
def connect(host:String,port:Int,keyspace:String):Cassandra.Client = { | |
val transport = new TFramedTransport(new TSocket(host, port)) | |
val protocol = new TBinaryProtocol(transport) | |
val client = new Cassandra.Client(protocol) | |
transport.open() | |
client set_keyspace keyspace | |
client | |
} | |
// Execute a range slice query against the specified Cassandra instance. Method returns | |
// an object suitable for later interrogation by range_slices_keys() or range_slices_columns() | |
def get_range_slices(client:Cassandra.Client,cf:String,start:String,end:String):Iterable[KeySlice] = { | |
val sliceRange = new SliceRange() | |
sliceRange setStart new Array[Byte](0) | |
sliceRange setFinish new Array[Byte](0) | |
sliceRange setReversed false | |
sliceRange setCount 100 | |
val slicePredicate = new SlicePredicate() | |
slicePredicate setSlice_range sliceRange | |
val columnParent = new ColumnParent(cf) | |
val keyRange = new KeyRange() | |
keyRange setStart_key (start getBytes) | |
keyRange setEnd_key (end getBytes) | |
val javakeys = client.get_range_slices(columnParent,slicePredicate,keyRange,ConsistencyLevel.ONE) | |
// Return from Thrift Java client is List<KeySlice> so we have to explicitly convert it here | |
JavaConversions asScalaIterable javakeys | |
} | |
// Return an Iterable for all keys in an input query state object | |
def range_slices_keys(slices:Iterable[KeySlice]) = slices map { c => new String(c.getKey) } | |
// Return an Option containing column information for the specified key in the input query | |
// state object. If the key isn't found None is returned, otherwise the Option contains a | |
// map of column names to column values. | |
def range_slices_columns(slices:Iterable[KeySlice], key:String):Option[Map[String,String]] = { | |
slices find { c => new String(c.getKey()) == key } match { | |
case None => None | |
case Some(keyval) => | |
val urcols = JavaConversions asScalaIterable (keyval getColumns) | |
val cols:Seq[Column] = (urcols map (_ getColumn)).toSeq | |
Some(Map(cols map { c => (new String(c.getName())) -> (new String(c.getValue())) }:_*)) | |
} | |
} | |
def main(args: Array[String]): Unit = { | |
val client = connect("localhost", 9160,"twitter") | |
val slices = get_range_slices(client,"authors","!","~") | |
val fivekeys = range_slices_keys(slices) take 5 | |
println("fivekeys: " + fivekeys) | |
for (key <- fivekeys) { | |
range_slices_columns(slices,key) match { | |
case None => | |
case Some(cols) => | |
println( | |
"Key " + key + | |
": name => " + (cols getOrElse ("name","")) + | |
", following => " + (cols getOrElse ("following","")) | |
) | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment