Skip to content

Instantly share code, notes, and snippets.

@mannharleen
Created September 15, 2017 06:17
Show Gist options
  • Select an option

  • Save mannharleen/3f5d21de1fd017387d64cd9dcd777756 to your computer and use it in GitHub Desktop.

Select an option

Save mannharleen/3f5d21de1fd017387d64cd9dcd777756 to your computer and use it in GitHub Desktop.
Fun with RDD transfomations :-
val rdd = sc.parallelize(Array(("a","c",1),("b","c",2),("a","a",5),("a","a",1)))
//rdd.collect
//res44: Array[(String, String, Int)] = Array((a,c,1), (b,c,2), (a,a,5), (a,a,1))
//------------------------------//
//-- map
rdd.map(p=> (p._1,p._2,p._3,p._3+1)).collect
//res54: Array[(String, String, Int, Int)] = Array((a,c,1,2), (b,c,2,3), (a,a,5,6), (a,a,1,2))
//------------------------------//
//-- mapPartitions: <same as before>
rdd.mapPartitions(x=> x.map(p=> (p._1,p._2,p._3,p._3+1))).collect
//Array[(String, String, Int, Int)] = Array((a,c,1,2), (b,c,2,3), (a,a,5,6), (a,a,1,2))
//------------------------------//
//-- mapPartitionsWithIndex
rdd.mapParititionsWithIndex((i,Iter_p) => Iter_p.map(p=> (p._1,p._2,p._3,i)), true).collect
//Array[(String, String, Int, Int)] = Array((a,c,1,0), (b,c,2,0), (a,a,5,1), (a,a,1,1))
//------------------------------//
//-- sample(withReplacement: Boolean, fraction: Double, seed: Long)
rdd.sample(false,0.5,1).collect
//Array[(String, String, Int)] = Array((a,c,1), (a,a,1))
//------------------------------//
//-- union: union 3 times
rdd.union(rdd).union(rdd).collect
//Array[(String, String, Int)] = Array((a,c,1), (b,c,2), (a,a,5), (a,a,1), (a,c,1), (b,c,2), (a,a,5), (a,a,1), (a,c,1), (b,c,2), (a,a,5), (a,a,1))
//------------------------------//
//-- intersection: retuen the elements that are common
val rddx = sc.parallelize(Array(("a","a",1)))
rdd.intersection(rddx).collect
//Array[(String, String, Int)] = Array((a,a,1))
//------------------------------//
//-- distinct
sc.parallelize(List("a","b","a")).distinct.collect
//Array[String] = Array(b, a)
//------------------------------//
//-- groupBy
sc.parallelize(List(("k1","v1.1","v1.2"),("k2","v2.1","v2.2"),("k1","v1.3","v1.4"))).groupBy(x=> x._1).collect
//Array[(String, Iterable[(String, String, String)])] = Array((k1,CompactBuffer((k1,v1.1,v1.2), (k1,v1.3,v1.4))), (k2,CompactBuffer((k2,v2.1,v2.2))))
//------------------------------//
//-- reduceByKey
sc.parallelize(List(("k1",1),("k2",2), ("k1",2))).reduceByKey((acc,n) => acc+n).collect
//Array[(String, Int)] = Array((k1,3), (k2,2))
//------------------------------//
//-- aggregateByKey (problem: calculate sum and count of key-values)
sc.parallelize(List(("k1",1),("k2",2), ("k1",2))).aggregateByKey((0,0))( (acc: (Int, Int), x: Int) => (acc._1+x , acc._1+1),
(acc: (Int, Int), y : (Int, Int)) => (acc._1 + y._1, acc._2+y._2)).collect
//Array[(String, (Int, Int))] = Array((k1,(3,2)), (k2,(2,1)))
//------------------------------//
//-- sortByKey
sc.parallelize(List(("k1",1),("k2",2), ("k1",2))).sortByKey().collect
//Array[(String, Int)] = Array((k1,1), (k1,2), (k2,2))
//------------------------------//
//-- join
sc.parallelize(List(("k1",1),("k2",2), ("k1",2))).join(sc.parallelize(List(("k1",5),("k2",5)))).collect
//Array[(String, (Int, Int))] = Array((k1,(1,5)), (k1,(2,5)), (k2,(2,5)))
//------------------------------//
//-- cogroup
sc.parallelize(List(("k1",1),("k2",2), ("k1",2))).cogroup(sc.parallelize(List(("k1",5),("k2",5)))).collect
//Array[(String, (Iterable[Int], Iterable[Int]))] = Array((k1,(CompactBuffer(1, 2),CompactBuffer(5))), (k2,(CompactBuffer(2),CompactBuffer(5))))
//------------------------------//
//-- cartesian
sc.parallelize(List(("k1",1),("k2",2), ("k1",2))).cartesian(sc.parallelize(List(("k1",5),("k2",5)))).collect
//Array[((String, Int), (String, Int))] = Array(((k1,1),(k1,5)), ((k1,1),(k2,5)), ((k2,2),(k1,5)), ((k1,2),(k1,5)), ((k2,2),(k2,5)), ((k1,2),(k2,5)))
//------------------------------//
//-- pipe
<fill me in>
//------------------------------//
//-- partitionBy
//creating a hash partition
rddkv.partitionBy(new org.apache.spark.HashPartitioner(3)).mapPartitionsWithIndex( (i,iter_p) => iter_p.map(x=>" index="+i+" value="+x)).collect
//Array[String] = Array(" index=0 value=(k1,1)", " index=0 value=(k1,2)", " index=1 value=(k2,2)", " index=2 value=(k3,5)", " index=2 value=(k3,1)")
//------------------------------//
//-- repartitionAndSortWithinPartitions
val rddkv = sc.parallelize(List(("k1",1),("k2",2),("k1",2),("k3",5),("k3",1)))
//rddkv.collect
//Array[(String, Int)] = Array((k1,1), (k2,2), (k1,2), (k3,5), (k3,1))
//creating a range partition to compare with
rddkv.partitionBy(new org.apache.spark.RangePartitioner(3,rddkv)).mapPartitionsWithIndex( (i,iter_p) => iter_p.map(x=>" index="+i+" value="+x)).collect
//Array[String] = Array(" index=0 value=(k1,1)", " index=0 value=(k1,2)", " index=1 value=(k2,2)", " index=1 value=(k3,5)", " index=1 value=(k3,1)")
//repartitionAndSortWithinPartitions -- notice that the value items is sorted within each partition
rddkv.repartitionAndSortWithinPartitions(new org.apache.spark.RangePartitioner(3,rddkv)).mapPartitionsWithIndex( (i,iter_p) => iter_p.map(x=>" index="+i+" value="+x)).collect
//Array[String] = Array(" index=0 value=(k1,1)", " index=0 value=(k1,2)", " index=1 value=(k2,2)", " index=1 value=(k3,5)", " index=1 value=(k3,1)")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment