Skip to content

Instantly share code, notes, and snippets.

@brycemcd
Created August 17, 2015 19:44
Show Gist options
  • Save brycemcd/2ee20fb2c55d3a4477c7 to your computer and use it in GitHub Desktop.
Save brycemcd/2ee20fb2c55d3a4477c7 to your computer and use it in GitHub Desktop.
import au.com.bytecode.opencsv.CSVParser
import org.apache.spark.rdd.RDD
val taxis = sc.textFile("/media/brycemcd/filestore/third_party_data/taxi/trip_fare_1.csv")
taxis.cache()
case class Trip(medallion: String,
hack_license: String,
vendor_id: String,
pickup_datetime: String,
payment_type: String,
fare_amount: Float,
surcharge: Float,
mta_tax: Float,
tip_amount: Float,
tolls_amount: Float,
total_amount: Float)
// http://webcache.googleusercontent.com/search?q=cache:PiatLb2iZkgJ:www.markhneedham.com/blog/2014/11/16/spark-parse-csv-file-and-group-by-column-value/+&cd=4&hl=en&ct=clnk&gl=us
taxis.map(line => {
val parser = new CSVParser(',')
parser.parseLine(line).mkString(",")
}).take(5).foreach(println)
def dropHeader(data: RDD[String]): RDD[String] = {
data.mapPartitionsWithIndex((idx, lines) => {
if (idx == 0) {
lines.drop(1)
}
lines
})
}
val withoutHeader: RDD[String] = dropHeader(taxis)
val lines = taxis.map(line => {
val parser = new CSVParser(',')
parser.parseLine(line).mkString(",")
})
lines.take(5).foreach(println)
columns.mkString("||")
val tripData = withoutHeader.mapPartitions(lines => {
val parser=new CSVParser(',')
lines.map(line => {
val columns = parser.parseLine(line)
Trip(columns(0),
columns(1),
columns(2),
columns(3),
columns(4),
columns(5).toFloat,
columns(6).toFloat,
columns(7).toFloat,
columns(8).toFloat,
columns(9).toFloat,
columns(10).toFloat
)
})
})
tripData.cache()
tripData.take(5).foreach(println)
tripData.flatMap(t => t.total_amount).take(5)
tripData.map(trip => {
(trip.medallion, trip.total_amount)
}).reduceByKey((x, y) => x + y).sortBy(-_._2).take(5).foreach(println)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment