Skip to content

Instantly share code, notes, and snippets.

@MLnick
Last active November 29, 2016 06:28
Show Gist options
  • Save MLnick/7230588 to your computer and use it in GitHub Desktop.
Save MLnick/7230588 to your computer and use it in GitHub Desktop.
PySpark / Hadoop InputFormat interop
// Python RDD creation functions //
// SequenceFile converted to Text and then to String
def sequenceFileAsText(path: String) = {
implicit val kcm = ClassManifest.fromClass(classOf[Text])
implicit val fcm = ClassManifest.fromClass(classOf[SequenceFileAsTextInputFormat])
new JavaPairRDD(sc
.newAPIHadoopFile[Text, Text, SequenceFileAsTextInputFormat](path)
.map{ case (k, v) => (k.toString, v.toString) }
)
}
// Arbitrary Hadoop InputFormat, key class and value class converted to String
def newHadoopFileAsText[K, V](path: String,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String): JavaPairRDD[String, String] = {
newHadoopFileFromClassNames(path, inputFormatClazz, keyClazz, valueClazz, keyWrapper, valueWrapper)
.map(new PairFunction[(K, V), String, String] { def call(t: (K, V)) = (t._1.toString, t._2.toString) })
}
private def newHadoopFileFromClassNames[K, V, F <: NewInputFormat[K, V]](
path: String,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String): JavaPairRDD[K, V] = {
implicit val kcm = ClassManifest.fromClass(Class.forName(keyClazz)).asInstanceOf[ClassManifest[K]]
implicit val vcm = ClassManifest.fromClass(Class.forName(valueClazz)).asInstanceOf[ClassManifest[V]]
implicit val fcm = ClassManifest.fromClass(Class.forName(inputFormatClazz))
val kc = kcm.erasure.asInstanceOf[Class[K]]
val vc = vcm.erasure.asInstanceOf[Class[V]]
val fc = fcm.erasure.asInstanceOf[Class[F]]
new JavaPairRDD[K, V](sc.newAPIHadoopFile(path, fc, kc, vc))
}
Python 2.7.5 (default, Jun 20 2013, 11:06:30)
Type "copyright", "credits" or "license" for more information.
IPython 1.0.0 -- An enhanced Interactive Python.
? -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help -> Python's own help system.
object? -> Details about 'object', use 'object??' for extra details.
log4j:WARN No appenders could be found for logger (org.apache.spark.util.Utils).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 0.9.0-SNAPSHOT
/_/
Using Python version 2.7.5 (default, Jun 20 2013 11:06:30)
Spark context avaiable as sc.
In [1]: r1 = sc.newHadoopFileAsText("/tmp/data/test",
"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.IntWritable")
2013-10-30 12:42:17.310 java[11249:4003] Unable to load realm info from SCDynamicStore
In [2]: r1.collect()
Out[2]: [(u'1', u'2'), (u'2', u'3'), (u'3', u'4')]
In [3]: r2 = sc.sequenceFileAsText("/tmp/data/test")
In [4]: r2.collect()
Out[4]: [(u'1', u'2'), (u'2', u'3'), (u'3', u'4')]
def writeAsPickle(elem: Any, dOut: DataOutputStream) {
if (elem.isInstanceOf[Array[Byte]]) {
val arr = elem.asInstanceOf[Array[Byte]]
dOut.writeInt(arr.length)
dOut.write(arr)
} else if (elem.isInstanceOf[scala.Tuple2[Any, Any]]) {
val tuple = elem.asInstanceOf[scala.Tuple2[Any, Any]]
if (tuple._1.isInstanceOf[Array[Byte]] && tuple._2.isInstanceOf[Array[Byte]]) {
val t = tuple.asInstanceOf[scala.Tuple2[Array[Byte], Array[Byte]]]
val length = t._1.length + t._2.length - 3 - 3 + 4 // stripPickle() removes 3 bytes
dOut.writeInt(length)
dOut.writeByte(Pickle.PROTO)
dOut.writeByte(Pickle.TWO)
dOut.write(PythonRDD.stripPickle(t._1))
dOut.write(PythonRDD.stripPickle(t._2))
dOut.writeByte(Pickle.TUPLE2)
dOut.writeByte(Pickle.STOP)
} else if (tuple._1.isInstanceOf[String] && tuple._2.isInstanceOf[String]) {
val t = tuple.asInstanceOf[scala.Tuple2[String, String]]
val tb = (t._1.getBytes("UTF-8"), t._2.getBytes("UTF-8"))
val length = 6 + tb._1.length + 4 + tb._2.length + 4
dOut.writeInt(length)
dOut.writeByte(Pickle.PROTO)
dOut.writeByte(Pickle.TWO)
dOut.write(Pickle.BINUNICODE)
dOut.writeInt(Integer.reverseBytes(tb._1.length))
dOut.write(tb._1)
dOut.write(Pickle.BINUNICODE)
dOut.writeInt(Integer.reverseBytes(tb._2.length))
dOut.write(tb._2)
dOut.writeByte(Pickle.TUPLE2)
dOut.writeByte(Pickle.STOP)
} else {
throw new SparkException("Unexpected RDD type")
}
} else if (elem.isInstanceOf[String]) {
// For uniformity, strings are wrapped into Pickles.
val s = elem.asInstanceOf[String].getBytes("UTF-8")
val length = 2 + 1 + 4 + s.length + 1
dOut.writeInt(length)
dOut.writeByte(Pickle.PROTO)
dOut.writeByte(Pickle.TWO)
dOut.write(Pickle.BINUNICODE)
dOut.writeInt(Integer.reverseBytes(s.length))
dOut.write(s)
dOut.writeByte(Pickle.STOP)
} else {
throw new SparkException("Unexpected RDD type")
}
}
def sequenceFileAsText(self, name):
"""
Read a Hadoopp SequenceFile with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI,
and return it as an RDD of (String, String) where the key and value representations
are generated using the 'toString()' method of the relevant Java class.
"""
#minSplits = minSplits or min(self.defaultParallelism, 2)
jrdd = self._jsc.sequenceFileAsText(name)
return RDD(jrdd, self)
def newHadoopFileAsText(self, name, inputFormat, keyClass, valueClass, keyWrap="", valueWrap="", minSplits=None):
"""
Read a Hadoopp file with arbitrary InputFormat, key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI,
and return it as an RDD of (String, String), where the key and value representations
are generated using the 'toString()' method of the relevant Java class.
"""
#minSplits = minSplits or min(self.defaultParallelism, 2)
jrdd = self._jsc.newHadoopFileAsText(name, inputFormat, keyClass, valueClass, keyWrap, valueWrap)
return RDD(jrdd, self)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment