Last active
December 26, 2015 12:19
-
-
Save MLnick/7150058 to your computer and use it in GitHub Desktop.
PySpark read arbitrary Hadoop InputFormats
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Welcome to | |
| ____ __ | |
| / __/__ ___ _____/ /__ | |
| _\ \/ _ \/ _ `/ __/ '_/ | |
| /__ / .__/\_,_/_/ /_/\_\ version 0.9.0-SNAPSHOT | |
| /_/ | |
| Using Python version 2.7.5 (default, Jun 20 2013 11:06:30) | |
| Spark context avaiable as sc. | |
| >>> rdd = sc.newHadoopFileAsText("/tmp/data/test",\ | |
| ... "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",\ | |
| ... "org.apache.hadoop.io.IntWritable",\ | |
| ... "org.apache.hadoop.io.IntWritable") | |
| 2013-10-25 08:07:59.738 java[19474:5703] Unable to load realm info from SCDynamicStore | |
| >>> rdd.collect() | |
| [u'1\t2', u'2\t3', u'3\t4'] | |
| >>> rdd.map(lambda x: (x.split("\t")[0], x.split("\t")[1])).collect() | |
| [(u'1', u'2'), (u'2', u'3'), (u'3', u'4')] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def newHadoopFileAsText(self, name, inputFormat, keyClass, valueClass, delim="\t", minSplits=None): | |
| """ | |
| Read a Hadoopp file with arbitrary InputFormat, key and value class from HDFS, | |
| a local file system (available on all nodes), or any Hadoop-supported file system URI, | |
| and return it as an RDD of Strings where the key and value are separated by 'delimiter'. | |
| """ | |
| minSplits = minSplits or min(self.defaultParallelism, 2) | |
| jrdd = self._jsc.newHadoopFileAsText(name, inputFormat, keyClass, valueClass, delim) | |
| return RDD(jrdd, self) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def newHadoopFileAsText[K, V, F <: NewInputFormat[K, V]]( | |
| path: String, | |
| inputFormatClazz: String, | |
| keyClazz: String, | |
| valueClazz: String, | |
| delimiter: String): JavaRDD[String] = { | |
| implicit val kcm = ClassManifest.fromClass(Class.forName(keyClazz)) | |
| implicit val vcm = ClassManifest.fromClass(Class.forName(valueClazz)) | |
| implicit val fcm = ClassManifest.fromClass(Class.forName(inputFormatClazz)) | |
| val kc = kcm.erasure.asInstanceOf[Class[K]] | |
| val vc = vcm.erasure.asInstanceOf[Class[V]] | |
| val fc = fcm.erasure.asInstanceOf[Class[F]] | |
| new JavaRDD(sc.newAPIHadoopFile(path, fc, kc, vc) | |
| .map{ case (k, v) => k.toString + delimiter + v.toString }) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment