Skip to content

Instantly share code, notes, and snippets.

val l = ("Bike & Skate Shop", Iterable("933,42,Nike VR_S Covert Driver,,179.99,http://images.acmesports.sports/Nike+VR_S+Covert+Driver",
"934,42,Callaway X Hot Driver,,0.0,http://images.acmesports.sports/Callaway+X+Hot+Driver",
"935,42,TaylorMade RocketBallz Stage 2 Driver,,169.99,http://images.acmesports.sports/TaylorMade+RocketBallz+Stage+2+Driver",
"936,42,Cleveland Golf Classic XL Driver,,119.99,http://images.acmesports.sports/Cleveland+Golf+Classic+XL+Driver",
"937,42,Cobra AMP Cell Driver - Orange,,169.99,http://images.acmesports.sports/Cobra+AMP+Cell+Driver+-+Orange"))
def topNProducts(rec: (String, Iterable[String]), topN: Int): Iterable[(String, String)] = {
rec._2.toList.sortBy(k => -k.split(",")(4).toFloat).take(topN).map(r => (rec._1, r))
}
val l = ("Bike & Skate Shop", Iterable("933,42,Nike VR_S Covert Driver,,179.99,http://images.acmesports.sports/Nike+VR_S+Covert+Driver",
"934,42,Callaway X Hot Driver,,0.0,http://images.acmesports.sports/Callaway+X+Hot+Driver",
"935,42,TaylorMade RocketBallz Stage 2 Driver,,169.99,http://images.acmesports.sports/TaylorMade+RocketBallz+Stage+2+Driver",
"936,42,Cleveland Golf Classic XL Driver,,119.99,http://images.acmesports.sports/Cleveland+Golf+Classic+XL+Driver",
"937,42,Cobra AMP Cell Driver - Orange,,169.99,http://images.acmesports.sports/Cobra+AMP+Cell+Driver+-+Orange"))
def topNPricedProducts(rec: (String, Iterable[String]), topN: Int): Iterable[(String, String)] = {
val list = rec._2.toList
val topNPrices = list.
map(rec => rec.split(",")(4).toFloat).
//This is just a script not a program
//Execute these things as part of Spark Shell
//Writing as sequence file
import org.apache.hadoop.io._
val products = sc.textFile("/public/retail_db/products")
products.map(rec => (NullWritable.get(), rec)).
saveAsSequenceFile("/user/dgadiraju/products_seq")
//Reading sequnce files
def topNProducts(rec, topN):
x = [ ]
x = list(sorted(rec, key=lambda k: float(k.split(",")[4]), reverse=True))
import itertools
return (y for y in list(itertools.islice(x, 0, topN)))
products = sc.textFile("/public/retail_db/products")
productsFiltered = products.filter(lambda rec: rec.split(",")[4] != "")
for i in productsFiltered.\
def getTopDenseN(rec, topN):
topNPricedProducts = [ ]
topNPrices = [ ]
prodPrices = [ ]
prodPricesDesc = [ ]
#10 records in rec
for i in rec:
prodPrices.append(float(i.split(",")[4]))
#prodPrices will have only prices from the 10 records
prodPricesDesc = list(sorted(set(prodPrices), reverse=True))
# flume-logger-hdfs.conf: Read data from logs and write it to both logger and hdfs
# flume command to start the agent - flume-ng agent --name a1 --conf /home/dgadiraju/flume_example/example.conf --conf-file example.conf
# Name the components on this agent
a1.sources = logsource
a1.sinks = loggersink hdfssink
a1.channels = loggerchannel hdfschannel
# Describe/configure the source
a1.sources.logsource.type = exec
# Create topics
kafka-topics.sh --create \
--zookeeper m01.itversity.com:2181,m02.itversity.com:2181,w01.itversity.com:2181 \
--replication-factor 1 \
--partitions 1 \
--topic kafkadg
# List all topics
kafka-topics.sh --list \
--zookeeper m01.itversity.com:2181,m02.itversity.com:2181,w01.itversity.com:2181
# kandf.conf: Flume and Kafka integration
# Read streaming data from logs and push it to Kafka as sink
# Name the components on this agent
kandf.sources = logsource
kandf.sinks = ksink
kandf.channels = mchannel
# Describe/configure the source
kandf.sources.logsource.type = exec
/**
* Created by itversity on 17/03/17.
* This is primarily to get the word count on the data received from
* nc -lk 19999
* Make sure build.sbt is updated with the dependency -
* libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.2"
* Create jar, ship the jar, start nc, and then use spark-submit
* spark-submit --class SparkStreamingWordCount --master yarn --conf spark.ui.port=14562 retail_2.10-1.0.jar
*/
import org.apache.spark.SparkConf
/**
* Created by itversity on 17/03/17.
*/
/* build.sbt
name := "retail"
version := "1.0"
scalaVersion := "2.10.6"