Created
April 18, 2020 13:08
-
-
Save rajesh-h/0515bc0be93c1ee96ace2b821fa7b56d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//Assignment 1: | |
//Answer A: | |
//reading input (DataBricks Console not wipro vdi) | |
var inp = sc.textFile("/FileStore/tables/log_file.txt") | |
// get the Header | |
var header = inp.first() | |
//Get only details record by removing header | |
var details = inp.filter(row => row != header) | |
// 0 = "date", | |
// 1 = "time", | |
// 2 = "size", | |
// 3 = "r_version", | |
// 4 = "r_arch", | |
// 5 = "r_os", | |
// 6 = "package", | |
// 7 = "version", | |
// 8 = "country", | |
// 9 = "ip_id" | |
// details.first() | |
// Filter out records where package = “NA” and version = “NA” | |
// Get only the NA records | |
// details.filter(row => row.split(",")(6) == "NA" && row.split(",")(7) == "NA" ).collect() | |
// Answer B - Ignore NA records | |
var validDetails = details.filter(row => row.split(",")(6) != "NA" && row.split(",")(7) != "NA" ) | |
//Answer C | |
var answerC = validDetails.map(row => (row.split(",")(6),1) ).reduceByKey((x,y) => x + y).collect() | |
//Answer D: | |
//Create rdd for country and download size | |
var answerD_0 = validDetails.map(row => (row.split(",")(8),row.split(",")(2).toInt) ) | |
//Get the sum and count to calculate average | |
var answerD = answerD_0.mapValues(x=> (x, 1)).reduceByKey((x, y) =>(x._1+y._1, x._2+y._2)).mapValues{ case(sum, count) => sum /count}.collect() | |
//Assignment 2: | |
// Answer A | |
//reading input (DataBricks Console not wipro vdi) | |
// read and remove invalid records | |
var inp = sc.textFile("/FileStore/tables/log_file.txt") | |
var header = inp.first() | |
var details = inp.filter(row => row != header) | |
//Below is required if we have to ignore NA package and NA version, for now I am not considering this as we have to ignore | |
// var validDetails = details.filter(row => row.split(",")(6) != "NA" && row.split(",")(7) != "NA" ) | |
//Answer B: | |
var countryMapRdd = sc.parallelize(List(("AU","AUSTRALIA"), ("FR","FRANCE"), ("NL","NETHERLANDS"), ("US","UNITED STATES"), ("DE","GERMANY"), ("GB","GREAT BRITAIN"), ("DK","DENAMRK"), ("CH","CHINA"))) | |
//Answer C: | |
//Strip double quotes so you can join against countryMapRdd | |
var answerC_0 = details.map(row => (row.split(",")(8).stripPrefix("\"").stripSuffix("\""),1) ).reduceByKey((x,y) => x + y) | |
var answerC = answerC_0.join(countryMapRdd).map(x => x._1 + "," + x._2._2 + "," + x._2._1) | |
//Answer D: | |
answerC.saveAsTextFile("/FileStore/tables/assignment2output") | |
// Assignment 3: | |
//Answer A: | |
//reading input (DataBricks Console not wipro vdi) | |
var inp = sc.textFile("/FileStore/tables/log_file.txt") | |
var header = inp.first() | |
var details = inp.filter(row => row != header) | |
//Answer B: | |
var inputDF = details.map(x=> x.split(",")).map( x => (x(0),x(1),x(2).toInt,x(3),x(4),x(5),x(6),x(7),x(8),x(9) )).toDF("date","time","size","r_version","r_arch","r_os","package","version","country","ip_id") | |
//Answer C: | |
import org.apache.spark.sql.functions._ | |
var inputDF_0 = inputDF.withColumn("Download_Type", | |
when(inputDF("size") < 100000, "Small") | |
.when(inputDF("size") > 100000 && inputDF("size") < 1000000, "Medium") | |
.otherwise("Big")) | |
//Answer D: | |
inputDF_0.groupBy("country","Download_Type").count().show() | |
//Answer E: | |
inputDF_0.groupBy("country","Download_Type").count().write.parquet("download_details.parquet") | |
//Assignment 4: | |
//Answer A: | |
//reading input (DataBricks Console not wipro vdi) | |
var inp = sc.textFile("/FileStore/tables/log_file.txt") | |
var header = inp.first() | |
var details = inp.filter(row => row != header) | |
//DF | |
var inputDF = details.map(x=> x.split(",")).map( x => (x(0),x(1),x(2).toInt,x(3),x(4),x(5),x(6),x(7),x(8),x(9) )).toDF("date","time","size","r_version","r_arch","r_os","package","version","country","ip_id") | |
//Write as Json | |
inputDF.write.format("json").save("/FileStore/tables/log_file.json") | |
//Answer B: | |
inpJson = sqlContext.read.json("/FileStore/tables/log_file.json") | |
//Answer C: | |
// Max, min, average | |
inpJson.groupBy("date").agg(max("size") as "MaxSize",min("size") as "MinSize", avg("size") as "AvgSize") | |
//Wirte to Json | |
inpJson.groupBy("date").agg(max("size") as "MaxSize",min("size") as "MinSize", avg("size") as "AvgSize").orderBy("date").write.format("json").save("/FileStore/tables/assignment_4.json") | |
//Assignment 5: | |
//Streaming not tested practically | |
ssc = StreamingContext(sc, 10) | |
var details = ssc.socketTextStream("localhost", 9999) | |
var inputDF = details.map(x=> x.split(",")).map( x => (x(0),x(1),x(2).toInt,x(3),x(4),x(5),x(6),x(7),x(8),x(9) )).toDF("date","time","size","r_version","r_arch","r_os","package","version","country","ip_id") | |
inputDF.filter(inputDF("size") > 100000).write.format("csv").save("/FileStore/tables/logs_large_downloads/log_file.csv") | |
inputDF.filter(inputDF("size") < 100000).write.format("csv").save("/FileStore/tables/logs_others/log_file.csv") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment