-
-
Save tmcgrath/dd8a0f5fb19201deb65f to your computer and use it in GitHub Desktop.
Welcome to | |
____ __ | |
/ __/__ ___ _____/ /__ | |
_\ \/ _ \/ _ `/ __/ '_/ | |
/___/ .__/\_,_/_/ /_/\_\ version 1.1.0 | |
/_/ | |
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_65) | |
Type in expressions to have them evaluated. | |
Type :help for more information. | |
2014-12-02 08:40:25.812 java[2479:1607] Unable to load realm mapping info from SCDynamicStore | |
14/12/02 08:40:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable | |
Spark context available as sc. | |
scala> val babyNamesCSV = sc.parallelize(List(("David", 6), ("Abby", 4), ("David", 5), ("Abby", 5))) | |
babyNamesCSV: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:12 | |
scala> babyNamesCSV.reduceByKey((n,c) => n + c).collect | |
res0: Array[(String, Int)] = Array((Abby,9), (David,11)) | |
scala> babyNamesCSV.aggregateByKey(0)((k,v) => v.toInt+k, (v,k) => k+v).collect | |
res1: Array[(String, Int)] = Array((Abby,9), (David,11)) |
Hi Sujitpal, Can you please guide me on doing avg on multiple columns in the same RDD? say example,
RDD1: Key1, Key2, Avg1, Avg2, Avg3
or
RDD2: Key1,Avg1,AVg2
NB: This is my $0.02. I've written a fair amount of coursework over the years & I'm just trying to help here.
You might consider using better naming to more clearly illustrate things to newbies. See my fork at https://gist.github.com/matthewadams/b107599a08719b166400; in particular, https://gist.github.com/matthewadams/b107599a08719b166400/revisions. Elaboration follows.
The reduceByKey
example,
babyNamesCSV.reduceByKey((n,c) => n + c).collect
uses (n,c) => n + c
. I'd suggest something similar to the following:
babyNamesCSV.reduceByKey((v1, v2) => v1 + v2).collect
Reasons:
- It is not clear what
n
&c
stand for, especially when a Spark newbie is trying to associate them with values (as distinct from keys). - In the function, they correspond to "first value" & "second value".
- The symmetry of
(v1, v2)
more closely matches the symmetry of the required signature of the function.
The aggregateByKey
example,
babyNamesCSV.aggregateByKey(0)((k,v) => v.toInt+k, (v,k) => k+v).collect
uses (k,v) => v.toInt+k
& (v,k) => k+v
. Here is how you might improve the aggregateByKey
example for clarity:
babyNamesCSV.aggregateByKey(0)((accum, v) => accum + v, (v1, v2) => v1 + v2).collect
Reasons:
- By using
k
&v
, the Spark newbie might easily think they stand for "key" & "value", which these most certainly do not, in either function. - In the first function, they correspond to "accumulator" and "value".
- In the second function,
- they correspond to "first value" & "second value", and
v.toInt+k
- uses an unnecessary
toInt
call, and - inverts the order of the use of the variables relative to their declaration.
- uses an unnecessary
Lastly, the name babyNamesCSV
is a bit verbose, and there is nothing in the example that refers to or uses the comma-separated value format. I might suggest nameCounts
or similar.
@matthewadams thanks for the clear explanation. yes I am new to spark and i exactly mis interpreted and got confused by the above code.
val products=sc.textFile("/user/cloudera/products")
val productmap=products.map(x=>x.split(",")).map(x=>(x(1).toInt,x(4).toFloat))
productmap.take(5).foreach(println)
(2,59.98)
(2,129.99)
(2,89.99)
(2,89.99)
(2,199.99)
val countandtotal=productmap.aggregateByKey((0,0.0))((x,y)=>(x._1+1,x._2+y),(x,y)=>(x._1+y._1,x._2+y._2))
org.apache.spark.rdd.RDD[(Int, (Int, Double))] = ShuffledRDD[38] at aggregateByKey at :31
countandtotal.take(2).foreach(println)
I want to count number of products under each category id and price under category...When I want to print countandtotal.take(2).foreach(println) then its shows number format exception .Even I changed intial value 0.0 to 0.0f..please help
@matthewadams Good explanation, I thought the same thing.
Note that you could replace
babyNamesCSV.reduceByKey((n,c) => n + c).collect
with
babyNamesCSV.reduceByKey(_ + _).collect
@MikeC711 - hopefully you already know how to do this. If not, here is the code snippet. Came here looking for the same thing as you, found it in one of the lectures on Spark Fundamentals I from BigDataUniversity.