Created
September 17, 2015 05:41
-
-
Save jude90/5af077474dc22bb7653b to your computer and use it in GitHub Desktop.
shuffle a csvfile by some fields and render them as parquet
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package my.spark | |
/** | |
* Created by jude on 15-9-16. | |
*/ | |
import scala.collection.JavaConversions.seqAsJavaList | |
import java.io.{FileReader, BufferedReader, File} | |
import java.util | |
import java.util.regex.Pattern | |
import breeze.io.TextReader.FileReader | |
import org.apache.spark.{SparkConf, SparkContext} | |
import java.util.{Arrays, Date} | |
import mdw._ | |
import parquet.schema.{MessageTypeParser, MessageType} | |
import org.apache.hadoop.fs.Path | |
object Csv2Parq { | |
def main(args: Array[String]){ | |
val sconf = new SparkConf().setMaster("local[4]").setAppName("CSV2Parquet") | |
val sc = new SparkContext(sconf) | |
val csvfile = sc.textFile("/home/jude/GPRS.data.csv",100) | |
val grouped =csvfile.groupBy{ line => | |
val fields = line.split(",") | |
val hour = new Date(Integer.parseInt(fields(0))).getHours | |
val num = if( fields(4).equals(" ")){ 0 }else{ Integer.parseInt(fields(4).substring(3,6)) } | |
hour -> num | |
}.cache() | |
val foo= grouped.map{ case((hour, num), lines) => | |
val schema = Csv2Parque.readFile("1k.schema") | |
var messageType = MessageTypeParser.parseMessageType(schema) | |
var writeSupport = new CsvWriteSupport(messageType) | |
// try { | |
var writer = new CsvParquetWriter(new Path("outputs/"+hour+"/"+num+".parquet"), messageType) | |
lines.foreach{ line => | |
val fields = line.split(Pattern.quote(",")) | |
writer.write(fields.toList) | |
} | |
writer.close() | |
// } | |
"OK" | |
}.collect() | |
sc.stop() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment