Skip to content

Instantly share code, notes, and snippets.

@jude90
Created September 17, 2015 05:41
Show Gist options
  • Save jude90/5af077474dc22bb7653b to your computer and use it in GitHub Desktop.
Save jude90/5af077474dc22bb7653b to your computer and use it in GitHub Desktop.
shuffle a csvfile by some fields and render them as parquet
package my.spark
/**
* Created by jude on 15-9-16.
*/
import scala.collection.JavaConversions.seqAsJavaList
import java.io.{FileReader, BufferedReader, File}
import java.util
import java.util.regex.Pattern
import breeze.io.TextReader.FileReader
import org.apache.spark.{SparkConf, SparkContext}
import java.util.{Arrays, Date}
import mdw._
import parquet.schema.{MessageTypeParser, MessageType}
import org.apache.hadoop.fs.Path
object Csv2Parq {
def main(args: Array[String]){
val sconf = new SparkConf().setMaster("local[4]").setAppName("CSV2Parquet")
val sc = new SparkContext(sconf)
val csvfile = sc.textFile("/home/jude/GPRS.data.csv",100)
val grouped =csvfile.groupBy{ line =>
val fields = line.split(",")
val hour = new Date(Integer.parseInt(fields(0))).getHours
val num = if( fields(4).equals(" ")){ 0 }else{ Integer.parseInt(fields(4).substring(3,6)) }
hour -> num
}.cache()
val foo= grouped.map{ case((hour, num), lines) =>
val schema = Csv2Parque.readFile("1k.schema")
var messageType = MessageTypeParser.parseMessageType(schema)
var writeSupport = new CsvWriteSupport(messageType)
// try {
var writer = new CsvParquetWriter(new Path("outputs/"+hour+"/"+num+".parquet"), messageType)
lines.foreach{ line =>
val fields = line.split(Pattern.quote(","))
writer.write(fields.toList)
}
writer.close()
// }
"OK"
}.collect()
sc.stop()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment