This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val l = ("Bike & Skate Shop", Iterable("933,42,Nike VR_S Covert Driver,,179.99,http://images.acmesports.sports/Nike+VR_S+Covert+Driver", | |
"934,42,Callaway X Hot Driver,,0.0,http://images.acmesports.sports/Callaway+X+Hot+Driver", | |
"935,42,TaylorMade RocketBallz Stage 2 Driver,,169.99,http://images.acmesports.sports/TaylorMade+RocketBallz+Stage+2+Driver", | |
"936,42,Cleveland Golf Classic XL Driver,,119.99,http://images.acmesports.sports/Cleveland+Golf+Classic+XL+Driver", | |
"937,42,Cobra AMP Cell Driver - Orange,,169.99,http://images.acmesports.sports/Cobra+AMP+Cell+Driver+-+Orange")) | |
def topNProducts(rec: (String, Iterable[String]), topN: Int): Iterable[(String, String)] = { | |
rec._2.toList.sortBy(k => -k.split(",")(4).toFloat).take(topN).map(r => (rec._1, r)) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val l = ("Bike & Skate Shop", Iterable("933,42,Nike VR_S Covert Driver,,179.99,http://images.acmesports.sports/Nike+VR_S+Covert+Driver", | |
"934,42,Callaway X Hot Driver,,0.0,http://images.acmesports.sports/Callaway+X+Hot+Driver", | |
"935,42,TaylorMade RocketBallz Stage 2 Driver,,169.99,http://images.acmesports.sports/TaylorMade+RocketBallz+Stage+2+Driver", | |
"936,42,Cleveland Golf Classic XL Driver,,119.99,http://images.acmesports.sports/Cleveland+Golf+Classic+XL+Driver", | |
"937,42,Cobra AMP Cell Driver - Orange,,169.99,http://images.acmesports.sports/Cobra+AMP+Cell+Driver+-+Orange")) | |
def topNPricedProducts(rec: (String, Iterable[String]), topN: Int): Iterable[(String, String)] = { | |
val list = rec._2.toList | |
val topNPrices = list. | |
map(rec => rec.split(",")(4).toFloat). |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//This is just a script not a program | |
//Execute these things as part of Spark Shell | |
//Writing as sequence file | |
import org.apache.hadoop.io._ | |
val products = sc.textFile("/public/retail_db/products") | |
products.map(rec => (NullWritable.get(), rec)). | |
saveAsSequenceFile("/user/dgadiraju/products_seq") | |
//Reading sequnce files |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def topNProducts(rec, topN): | |
x = [ ] | |
x = list(sorted(rec, key=lambda k: float(k.split(",")[4]), reverse=True)) | |
import itertools | |
return (y for y in list(itertools.islice(x, 0, topN))) | |
products = sc.textFile("/public/retail_db/products") | |
productsFiltered = products.filter(lambda rec: rec.split(",")[4] != "") | |
for i in productsFiltered.\ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def getTopDenseN(rec, topN): | |
topNPricedProducts = [ ] | |
topNPrices = [ ] | |
prodPrices = [ ] | |
prodPricesDesc = [ ] | |
#10 records in rec | |
for i in rec: | |
prodPrices.append(float(i.split(",")[4])) | |
#prodPrices will have only prices from the 10 records | |
prodPricesDesc = list(sorted(set(prodPrices), reverse=True)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# flume-logger-hdfs.conf: Read data from logs and write it to both logger and hdfs | |
# flume command to start the agent - flume-ng agent --name a1 --conf /home/dgadiraju/flume_example/example.conf --conf-file example.conf | |
# Name the components on this agent | |
a1.sources = logsource | |
a1.sinks = loggersink hdfssink | |
a1.channels = loggerchannel hdfschannel | |
# Describe/configure the source | |
a1.sources.logsource.type = exec |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Create topics | |
kafka-topics.sh --create \ | |
--zookeeper m01.itversity.com:2181,m02.itversity.com:2181,w01.itversity.com:2181 \ | |
--replication-factor 1 \ | |
--partitions 1 \ | |
--topic kafkadg | |
# List all topics | |
kafka-topics.sh --list \ | |
--zookeeper m01.itversity.com:2181,m02.itversity.com:2181,w01.itversity.com:2181 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# kandf.conf: Flume and Kafka integration | |
# Read streaming data from logs and push it to Kafka as sink | |
# Name the components on this agent | |
kandf.sources = logsource | |
kandf.sinks = ksink | |
kandf.channels = mchannel | |
# Describe/configure the source | |
kandf.sources.logsource.type = exec |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Created by itversity on 17/03/17. | |
* This is primarily to get the word count on the data received from | |
* nc -lk 19999 | |
* Make sure build.sbt is updated with the dependency - | |
* libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.2" | |
* Create jar, ship the jar, start nc, and then use spark-submit | |
* spark-submit --class SparkStreamingWordCount --master yarn --conf spark.ui.port=14562 retail_2.10-1.0.jar | |
*/ | |
import org.apache.spark.SparkConf |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Created by itversity on 17/03/17. | |
*/ | |
/* build.sbt | |
name := "retail" | |
version := "1.0" | |
scalaVersion := "2.10.6" |