Skip to content

Instantly share code, notes, and snippets.

@onsails
Created March 14, 2013 19:12
Show Gist options
  • Select an option

  • Save onsails/5164258 to your computer and use it in GitHub Desktop.

Select an option

Save onsails/5164258 to your computer and use it in GitHub Desktop.
package com.onsails.pixel.spark
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.kiji.mapreduce.framework.KijiTableInputFormat
import spark.{Dependency, RDD, SerializableWritable, SparkContext, Split, TaskContext}
import org.kiji.schema.{KijiRowData, EntityId}
class KijiNewHadoopSplit(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable)
extends Split {
val serializableHadoopSplit = new SerializableWritable(rawSplit)
override def hashCode(): Int = (41 * (41 + rddId) + index)
}
class KijiNewHadoopRDD(
sc: SparkContext,
inputFormatClass: Class[_ <: KijiTableInputFormat],
@transient conf: Configuration)
extends RDD[(EntityId, KijiRowData)](sc)
with HadoopMapReduceUtil {
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
val confBroadcast = sc.broadcast(new SerializableWritable(conf))
// private val serializableConf = new SerializableWritable(conf)
private val jobtrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
formatter.format(new Date())
}
@transient
private val jobId = new JobID(jobtrackerId, id)
@transient
private val splits_ : Array[Split] = {
val inputFormat = inputFormatClass.newInstance
inputFormat.setConf(conf)
val jobContext = newJobContext(conf, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Split](rawSplits.size)
for (i <- 0 until rawSplits.size) {
result(i) = new KijiNewHadoopSplit(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
}
result
}
override def splits = splits_
override def compute(theSplit: Split, context: TaskContext) = new Iterator[(EntityId, KijiRowData)] {
val split = theSplit.asInstanceOf[KijiNewHadoopSplit]
val conf = confBroadcast.value.value
val attemptId = new TaskAttemptID(jobtrackerId, id, true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
val format = inputFormatClass.newInstance
format.setConf(conf)
val reader = format.createRecordReader(
split.serializableHadoopSplit.value, hadoopAttemptContext)
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback(() => reader.close())
var havePair = false
var finished = false
override def hasNext: Boolean = {
if (!finished && !havePair) {
finished = !reader.nextKeyValue
havePair = !finished
}
!finished
}
override def next: (EntityId, KijiRowData) = {
if (!hasNext) {
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false
return (reader.getCurrentKey, reader.getCurrentValue)
}
}
override def preferredLocations(split: Split) = {
val theSplit = split.asInstanceOf[KijiNewHadoopSplit]
theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost")
}
override val dependencies: List[Dependency[_]] = Nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment