Skip to content

Instantly share code, notes, and snippets.

@maasg
Created June 16, 2014 16:22
Show Gist options
  • Select an option

  • Save maasg/68de6016bffe5e71b78c to your computer and use it in GitHub Desktop.

Select an option

Save maasg/68de6016bffe5e71b78c to your computer and use it in GitHub Desktop.
Simple Calliope Save To Cassandra Test
def saveToCassandra(sc:SparkContext, total:Int): Long = {
val cas = CasBuilder.cql3.withColumnFamily("keyspace", "raw").onHost(host).onPort(port)
.saveWithQuery("UPDATE keyspace.raw set metric=?, vals=?")
implicit def TsToCasKeys(src:Array[String]): Map[String, ByteBuffer] = src match {
case Array(device_id, aggregation_type, t0, metric, vals) =>
Map("device_id" -> device_id, "aggregation_type" -> aggregation_type, "t0" -> t0)
}
implicit def toToCassandraVals(src:Array[String]) : List[ByteBuffer] = src match {
case Array(device_id, aggregation_type, t0, metric, vals) => List(metric, vals)
}
val entries = for (i <- 1 to total) yield {
Array(s"devy$i", "aggr", "1000", "sum", (i to i+10).mkString(","))
}
val rdd = sc.parallelize(entries,8)
rdd.cql3SaveToCassandra(cas)
total
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment