Created
March 14, 2013 19:12
-
-
Save onsails/5164258 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.onsails.pixel.spark | |
| import java.text.SimpleDateFormat | |
| import java.util.Date | |
| import org.apache.hadoop.conf.Configuration | |
| import org.apache.hadoop.io.Writable | |
| import org.apache.hadoop.mapreduce._ | |
| import org.kiji.mapreduce.framework.KijiTableInputFormat | |
| import spark.{Dependency, RDD, SerializableWritable, SparkContext, Split, TaskContext} | |
| import org.kiji.schema.{KijiRowData, EntityId} | |
| class KijiNewHadoopSplit(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable) | |
| extends Split { | |
| val serializableHadoopSplit = new SerializableWritable(rawSplit) | |
| override def hashCode(): Int = (41 * (41 + rddId) + index) | |
| } | |
| class KijiNewHadoopRDD( | |
| sc: SparkContext, | |
| inputFormatClass: Class[_ <: KijiTableInputFormat], | |
| @transient conf: Configuration) | |
| extends RDD[(EntityId, KijiRowData)](sc) | |
| with HadoopMapReduceUtil { | |
| // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it | |
| val confBroadcast = sc.broadcast(new SerializableWritable(conf)) | |
| // private val serializableConf = new SerializableWritable(conf) | |
| private val jobtrackerId: String = { | |
| val formatter = new SimpleDateFormat("yyyyMMddHHmm") | |
| formatter.format(new Date()) | |
| } | |
| @transient | |
| private val jobId = new JobID(jobtrackerId, id) | |
| @transient | |
| private val splits_ : Array[Split] = { | |
| val inputFormat = inputFormatClass.newInstance | |
| inputFormat.setConf(conf) | |
| val jobContext = newJobContext(conf, jobId) | |
| val rawSplits = inputFormat.getSplits(jobContext).toArray | |
| val result = new Array[Split](rawSplits.size) | |
| for (i <- 0 until rawSplits.size) { | |
| result(i) = new KijiNewHadoopSplit(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) | |
| } | |
| result | |
| } | |
| override def splits = splits_ | |
| override def compute(theSplit: Split, context: TaskContext) = new Iterator[(EntityId, KijiRowData)] { | |
| val split = theSplit.asInstanceOf[KijiNewHadoopSplit] | |
| val conf = confBroadcast.value.value | |
| val attemptId = new TaskAttemptID(jobtrackerId, id, true, split.index, 0) | |
| val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) | |
| val format = inputFormatClass.newInstance | |
| format.setConf(conf) | |
| val reader = format.createRecordReader( | |
| split.serializableHadoopSplit.value, hadoopAttemptContext) | |
| reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) | |
| // Register an on-task-completion callback to close the input stream. | |
| context.addOnCompleteCallback(() => reader.close()) | |
| var havePair = false | |
| var finished = false | |
| override def hasNext: Boolean = { | |
| if (!finished && !havePair) { | |
| finished = !reader.nextKeyValue | |
| havePair = !finished | |
| } | |
| !finished | |
| } | |
| override def next: (EntityId, KijiRowData) = { | |
| if (!hasNext) { | |
| throw new java.util.NoSuchElementException("End of stream") | |
| } | |
| havePair = false | |
| return (reader.getCurrentKey, reader.getCurrentValue) | |
| } | |
| } | |
| override def preferredLocations(split: Split) = { | |
| val theSplit = split.asInstanceOf[KijiNewHadoopSplit] | |
| theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost") | |
| } | |
| override val dependencies: List[Dependency[_]] = Nil | |
| } | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment