Last active
November 29, 2016 06:28
-
-
Save MLnick/7230588 to your computer and use it in GitHub Desktop.
PySpark / Hadoop InputFormat interop
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Python RDD creation functions // | |
// SequenceFile converted to Text and then to String | |
def sequenceFileAsText(path: String) = { | |
implicit val kcm = ClassManifest.fromClass(classOf[Text]) | |
implicit val fcm = ClassManifest.fromClass(classOf[SequenceFileAsTextInputFormat]) | |
new JavaPairRDD(sc | |
.newAPIHadoopFile[Text, Text, SequenceFileAsTextInputFormat](path) | |
.map{ case (k, v) => (k.toString, v.toString) } | |
) | |
} | |
// Arbitrary Hadoop InputFormat, key class and value class converted to String | |
def newHadoopFileAsText[K, V](path: String, | |
inputFormatClazz: String, | |
keyClazz: String, | |
valueClazz: String, | |
keyWrapper: String, | |
valueWrapper: String): JavaPairRDD[String, String] = { | |
newHadoopFileFromClassNames(path, inputFormatClazz, keyClazz, valueClazz, keyWrapper, valueWrapper) | |
.map(new PairFunction[(K, V), String, String] { def call(t: (K, V)) = (t._1.toString, t._2.toString) }) | |
} | |
private def newHadoopFileFromClassNames[K, V, F <: NewInputFormat[K, V]]( | |
path: String, | |
inputFormatClazz: String, | |
keyClazz: String, | |
valueClazz: String, | |
keyWrapper: String, | |
valueWrapper: String): JavaPairRDD[K, V] = { | |
implicit val kcm = ClassManifest.fromClass(Class.forName(keyClazz)).asInstanceOf[ClassManifest[K]] | |
implicit val vcm = ClassManifest.fromClass(Class.forName(valueClazz)).asInstanceOf[ClassManifest[V]] | |
implicit val fcm = ClassManifest.fromClass(Class.forName(inputFormatClazz)) | |
val kc = kcm.erasure.asInstanceOf[Class[K]] | |
val vc = vcm.erasure.asInstanceOf[Class[V]] | |
val fc = fcm.erasure.asInstanceOf[Class[F]] | |
new JavaPairRDD[K, V](sc.newAPIHadoopFile(path, fc, kc, vc)) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Python 2.7.5 (default, Jun 20 2013, 11:06:30) | |
Type "copyright", "credits" or "license" for more information. | |
IPython 1.0.0 -- An enhanced Interactive Python. | |
? -> Introduction and overview of IPython's features. | |
%quickref -> Quick reference. | |
help -> Python's own help system. | |
object? -> Details about 'object', use 'object??' for extra details. | |
log4j:WARN No appenders could be found for logger (org.apache.spark.util.Utils). | |
log4j:WARN Please initialize the log4j system properly. | |
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. | |
Welcome to | |
____ __ | |
/ __/__ ___ _____/ /__ | |
_\ \/ _ \/ _ `/ __/ '_/ | |
/__ / .__/\_,_/_/ /_/\_\ version 0.9.0-SNAPSHOT | |
/_/ | |
Using Python version 2.7.5 (default, Jun 20 2013 11:06:30) | |
Spark context avaiable as sc. | |
In [1]: r1 = sc.newHadoopFileAsText("/tmp/data/test", | |
"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", | |
"org.apache.hadoop.io.IntWritable", | |
"org.apache.hadoop.io.IntWritable") | |
2013-10-30 12:42:17.310 java[11249:4003] Unable to load realm info from SCDynamicStore | |
In [2]: r1.collect() | |
Out[2]: [(u'1', u'2'), (u'2', u'3'), (u'3', u'4')] | |
In [3]: r2 = sc.sequenceFileAsText("/tmp/data/test") | |
In [4]: r2.collect() | |
Out[4]: [(u'1', u'2'), (u'2', u'3'), (u'3', u'4')] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def writeAsPickle(elem: Any, dOut: DataOutputStream) { | |
if (elem.isInstanceOf[Array[Byte]]) { | |
val arr = elem.asInstanceOf[Array[Byte]] | |
dOut.writeInt(arr.length) | |
dOut.write(arr) | |
} else if (elem.isInstanceOf[scala.Tuple2[Any, Any]]) { | |
val tuple = elem.asInstanceOf[scala.Tuple2[Any, Any]] | |
if (tuple._1.isInstanceOf[Array[Byte]] && tuple._2.isInstanceOf[Array[Byte]]) { | |
val t = tuple.asInstanceOf[scala.Tuple2[Array[Byte], Array[Byte]]] | |
val length = t._1.length + t._2.length - 3 - 3 + 4 // stripPickle() removes 3 bytes | |
dOut.writeInt(length) | |
dOut.writeByte(Pickle.PROTO) | |
dOut.writeByte(Pickle.TWO) | |
dOut.write(PythonRDD.stripPickle(t._1)) | |
dOut.write(PythonRDD.stripPickle(t._2)) | |
dOut.writeByte(Pickle.TUPLE2) | |
dOut.writeByte(Pickle.STOP) | |
} else if (tuple._1.isInstanceOf[String] && tuple._2.isInstanceOf[String]) { | |
val t = tuple.asInstanceOf[scala.Tuple2[String, String]] | |
val tb = (t._1.getBytes("UTF-8"), t._2.getBytes("UTF-8")) | |
val length = 6 + tb._1.length + 4 + tb._2.length + 4 | |
dOut.writeInt(length) | |
dOut.writeByte(Pickle.PROTO) | |
dOut.writeByte(Pickle.TWO) | |
dOut.write(Pickle.BINUNICODE) | |
dOut.writeInt(Integer.reverseBytes(tb._1.length)) | |
dOut.write(tb._1) | |
dOut.write(Pickle.BINUNICODE) | |
dOut.writeInt(Integer.reverseBytes(tb._2.length)) | |
dOut.write(tb._2) | |
dOut.writeByte(Pickle.TUPLE2) | |
dOut.writeByte(Pickle.STOP) | |
} else { | |
throw new SparkException("Unexpected RDD type") | |
} | |
} else if (elem.isInstanceOf[String]) { | |
// For uniformity, strings are wrapped into Pickles. | |
val s = elem.asInstanceOf[String].getBytes("UTF-8") | |
val length = 2 + 1 + 4 + s.length + 1 | |
dOut.writeInt(length) | |
dOut.writeByte(Pickle.PROTO) | |
dOut.writeByte(Pickle.TWO) | |
dOut.write(Pickle.BINUNICODE) | |
dOut.writeInt(Integer.reverseBytes(s.length)) | |
dOut.write(s) | |
dOut.writeByte(Pickle.STOP) | |
} else { | |
throw new SparkException("Unexpected RDD type") | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def sequenceFileAsText(self, name): | |
""" | |
Read a Hadoopp SequenceFile with arbitrary key and value class from HDFS, | |
a local file system (available on all nodes), or any Hadoop-supported file system URI, | |
and return it as an RDD of (String, String) where the key and value representations | |
are generated using the 'toString()' method of the relevant Java class. | |
""" | |
#minSplits = minSplits or min(self.defaultParallelism, 2) | |
jrdd = self._jsc.sequenceFileAsText(name) | |
return RDD(jrdd, self) | |
def newHadoopFileAsText(self, name, inputFormat, keyClass, valueClass, keyWrap="", valueWrap="", minSplits=None): | |
""" | |
Read a Hadoopp file with arbitrary InputFormat, key and value class from HDFS, | |
a local file system (available on all nodes), or any Hadoop-supported file system URI, | |
and return it as an RDD of (String, String), where the key and value representations | |
are generated using the 'toString()' method of the relevant Java class. | |
""" | |
#minSplits = minSplits or min(self.defaultParallelism, 2) | |
jrdd = self._jsc.newHadoopFileAsText(name, inputFormat, keyClass, valueClass, keyWrap, valueWrap) | |
return RDD(jrdd, self) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment