Skip to content

Instantly share code, notes, and snippets.

@MLnick
Last active December 26, 2015 12:19
Show Gist options
  • Select an option

  • Save MLnick/7150058 to your computer and use it in GitHub Desktop.

Select an option

Save MLnick/7150058 to your computer and use it in GitHub Desktop.
PySpark read arbitrary Hadoop InputFormats
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 0.9.0-SNAPSHOT
/_/
Using Python version 2.7.5 (default, Jun 20 2013 11:06:30)
Spark context avaiable as sc.
>>> rdd = sc.newHadoopFileAsText("/tmp/data/test",\
... "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",\
... "org.apache.hadoop.io.IntWritable",\
... "org.apache.hadoop.io.IntWritable")
2013-10-25 08:07:59.738 java[19474:5703] Unable to load realm info from SCDynamicStore
>>> rdd.collect()
[u'1\t2', u'2\t3', u'3\t4']
>>> rdd.map(lambda x: (x.split("\t")[0], x.split("\t")[1])).collect()
[(u'1', u'2'), (u'2', u'3'), (u'3', u'4')]
def newHadoopFileAsText(self, name, inputFormat, keyClass, valueClass, delim="\t", minSplits=None):
"""
Read a Hadoopp file with arbitrary InputFormat, key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI,
and return it as an RDD of Strings where the key and value are separated by 'delimiter'.
"""
minSplits = minSplits or min(self.defaultParallelism, 2)
jrdd = self._jsc.newHadoopFileAsText(name, inputFormat, keyClass, valueClass, delim)
return RDD(jrdd, self)
def newHadoopFileAsText[K, V, F <: NewInputFormat[K, V]](
path: String,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
delimiter: String): JavaRDD[String] = {
implicit val kcm = ClassManifest.fromClass(Class.forName(keyClazz))
implicit val vcm = ClassManifest.fromClass(Class.forName(valueClazz))
implicit val fcm = ClassManifest.fromClass(Class.forName(inputFormatClazz))
val kc = kcm.erasure.asInstanceOf[Class[K]]
val vc = vcm.erasure.asInstanceOf[Class[V]]
val fc = fcm.erasure.asInstanceOf[Class[F]]
new JavaRDD(sc.newAPIHadoopFile(path, fc, kc, vc)
.map{ case (k, v) => k.toString + delimiter + v.toString })
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment