Created
October 26, 2016 11:18
-
-
Save oluies/632faeacfcc834aa79631ff9b4071039 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.types.{DoubleType,LongType,ShortType, IntegerType, StructField,TimestampType, StructType,StringType,NumericType,BooleanType} | |
import org.apache.hadoop.fs.{FileSystem,Path} | |
val sqlContext = new org.apache.spark.sql.SQLContext(sc) | |
import sqlContext.implicits._ | |
def getschemametod(): StructType = { | |
StructType( | |
Seq( | |
StructField("dll",StringType,true), | |
StructField("class",StringType,true), | |
StructField("method",StringType,true), | |
StructField("""text""",StringType,true), | |
StructField("tobiv",ShortType,true) | |
) | |
) | |
} | |
def csvToParquet(file: Path, filetype:String, year:String, month:String) { | |
val df = sqlContext.read | |
.format("com.databricks.spark.csv") | |
.option("header", "true") // Use first line of all files as header | |
.option("inferSchema", "true") // Automatical¤ly infer data types | |
.option("delimiter",",") | |
.option("charset","ISO-8859-15") | |
.option("mode","DROPMALFORMED") | |
.option("treatEmptyValuesAsNulls","true") | |
.option("nullValue","NULL") | |
//.schema(getSchemauttag()) | |
.load(file.toString() ) | |
import org.apache.spark.sql.SaveMode | |
val parquetTarget = s"/data/adobe/confidential/p/raw/${filetype}/v1/country=se/year=${year}/month=${month}" | |
println(parquetTarget) | |
df.write.mode(SaveMode.Overwrite).parquet(parquetTarget) | |
} | |
def csvToDF(file: Path, delimiter : String,charset: String = "UTF8", useHeader: Boolean = true, schema: Option[StructType] = None) = { | |
val df = schema match { | |
case Some(schema) => sqlContext.read | |
.format("com.databricks.spark.csv") | |
.option("header",useHeader.toString()) // Use first line of all files as header | |
.option("inferSchema", "true") // Automatical¤ly infer data types | |
.option("delimiter",delimiter) | |
.option("charset",charset) | |
.option("mode","DROPMALFORMED") | |
.option("treatEmptyValuesAsNulls","true") | |
.option("nullValue","NULL") | |
.schema(schema) | |
.load(file.toString() ) | |
case None => sqlContext.read | |
.format("com.databricks.spark.csv") | |
.option("header",useHeader.toString()) // Use first line of all files as header | |
.option("inferSchema", "true") // Automatical¤ly infer data types | |
.option("delimiter",delimiter) | |
.option("charset",charset) | |
.option("mode","DROPMALFORMED") | |
.option("treatEmptyValuesAsNulls","true") | |
.option("nullValue","NULL") | |
.load(file.toString() ) | |
} | |
df | |
} | |
def processFile(file: Path) = { | |
val monthtunum = Map("Jan" -> "01", "Feb" -> "02", "Mar" -> "03","Apr" -> "04", "May" -> "05","Maj" -> "05", "Jun" -> "06", | |
"Jul" -> "07", "Aug" -> "08", "Sep" -> "09", "Oct" -> "10","Nov" -> "11", "Dec" -> "12") | |
val filename = file.getName() | |
println(filename) | |
// /Apr16_uttag.csv | |
val name = filename.split('.')(0) | |
val splitname = name.split('_') | |
val date = splitname(0) | |
val year = "20"+date.substring(3,5) | |
val month = date.substring(0,3) | |
println (name); | |
println(year) | |
println(monthtunum(month)) | |
println(file + " " + " " + year + " " + month) | |
// csvToParquet(file,"uttag",year,monthtunum(month)) | |
} | |
def getschedata(): StructType = { | |
StructType( | |
Seq( | |
StructField("id",StringType,true), | |
StructField("starttid",TimestampType,true), | |
StructField("felkod",StringType,true), | |
StructField("dll",StringType,true), | |
StructField("klass",StringType,true), | |
StructField("metod",StringType,true), | |
StructField("indata",StringType,true), | |
StructField("KlientIP",StringType,true), | |
StructField("TjansteAnrop",StringType,true) | |
) | |
) | |
} | |
val filedata = new Path("/data/loggik/confidential/db*.csv") | |
val mobilefiledata = new Path("/data/loggik/confidential/mobile/db*.csv") | |
val dfdata = csvToDF(filedata,",",useHeader = true,schema=Option(getschedata())) | |
val dfmobiledata = csvToDF(mobilefiledata,",",useHeader = true,schema=Option(getschedata())) | |
dfdata .registerTempTable("dfdata") | |
dfmobiledata.registerTempTable("dfmobiledata") | |
val sqltext = """SELECT YEAR(starttid) as year, | |
MONTH(starttid) AS month, | |
trim(id) AS id, | |
starttid, | |
trim(felkod) AS felkod, | |
trim(dll) AS dll, | |
trim(klass) as klass, | |
trim(metod) as metod, | |
trim(indata) as indata, | |
trim(KlientIP) as klientip, | |
trim(TjansteAnrop) as tjansteAnrop | |
FROM dfdata""" | |
val sqltextdfmobiledata = """SELECT YEAR(starttid) as year, | |
MONTH(starttid) AS month, | |
trim(id) AS id, | |
starttid, | |
trim(felkod) AS felkod, | |
trim(dll) AS dll, | |
trim(klass) as klass, | |
trim(metod) as metod, | |
trim(indata) as indata, | |
trim(KlientIP) as klientip, | |
trim(TjansteAnrop) as tjansteAnrop | |
FROM dfmobiledata""" | |
val dfsaldata = sqlContext.sql(sqltext ) | |
val dfsaldatamobile = sqlContext.sql(sqltextdfmobiledata ) | |
val parquetTarget = s"/data/loggik/confidential/p/db/v1" | |
val parquetTargetmobile = s"/data/loggik/confidential/mobile/p/db/v1" | |
import org.apache.spark.sql.SaveMode | |
dfsaldata.write.partitionBy("year","month").mode(SaveMode.Append).parquet(parquetTarget) | |
dfsaldatamobile.write.partitionBy("year","month").mode(SaveMode.Append).parquet(parquetTargetmobile ) | |
spark-shell --master yarn-client --deploy-mode client --driver-cores 4 --num-executors 8 --conf "spark.driver.extraJavaOptions=-Dhttp.proxyHost=gias.sebank.se -Dhttp.proxyPort=8080 -Dhttps.proxyHost=gias.sebank.se -Dhttps.proxyPort=8080" --packages com.databricks:spark-csv_2.10:1.4.0 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment