Skip to content

Instantly share code, notes, and snippets.

@ariens
Created June 26, 2015 15:24
Show Gist options
  • Save ariens/2c44c30e064b1790146a to your computer and use it in GitHub Desktop.
Save ariens/2c44c30e064b1790146a to your computer and use it in GitHub Desktop.
Spark-Boom-Demo
package com.blackberry.bdp.sparkMesosDemo
import com.blackberry.bdp.containers.boom._
import com.twitter.chill.avro.AvroSerializer
import com.twitter.chill.avro.AvroSerializer.SpecificRecordBinarySerializer
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator}
import org.apache.hadoop.io.NullWritable
import com.twitter.chill.avro.AvroSerializer
import com.blackberry.bdp.containers.boom._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.avro._
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce._
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.io.NullWritable
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
object Main {
def main(args: Array[String]) {
println("Hello, world!")
if (args.length < 2) {
System.err.println("Usage: ./spark-submit <options> <principal> <keytab> <path to boom file in hdfs>")
System.exit(1)
}
val userPrincipal = args(0)
val userKeytab = args(1)
// val utils = new SparkHadoopUtil()
// utils.loginUserFromKeytab(userPrincipal, userKeytab)
UserGroupInformation.loginUserFromKeytab(userPrincipal, userKeytab)
val input = args(2)
val conf = new SparkConf().setAppName("Example-Spark")
conf.set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName)
conf.set("spark.kryo.registrator", classOf[com.blackberry.bdp.sparkMesosDemo.Registrator].getName)
val sc = new SparkContext(conf)
val readJob = new Job()
AvroJob.setInputKeySchema(readJob, LogBlock.getClassSchema())
val rdd = sc.newAPIHadoopFile(input,
classOf[AvroKeyInputFormat[LogBlock]],
classOf[AvroKey[LogBlock]],
classOf[NullWritable],
readJob.getConfiguration)
//println(rdd.first())
//println(rdd.count())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment