Skip to content

Instantly share code, notes, and snippets.

@thanoojgithub
Created February 12, 2017 20:30
Show Gist options
  • Save thanoojgithub/ca0f2386894e5ecf7dac4ac7985e3019 to your computer and use it in GitHub Desktop.
Save thanoojgithub/ca0f2386894e5ecf7dac4ac7985e3019 to your computer and use it in GitHub Desktop.
apache spark with scala
Apache Spark is general purpose computation/execution engine,
uses RDD in a reselient(lineage using underlying HDFS for recovery, in its own way).
having Transformations results new RDD from it, consistency by Immutable in nature
does Lazy evaluation until action called.
Benifits:
Fault Recovery using lineage
Optimized for inmemory computations - placing computations optimally using directed acyclic graph
Easy programming - doing transfermations on RDD by calling actions.
Rich is library support - MLib (machine learning), graphx, data frames, including batch and streaming
HADOOP
1. scalability + unstructured + reliability + immutable for parallelism + using HDFS
2. processing framework - MapReduce
3. Boiler plat coding
Spark
1. In-memory framework - for Spark Core, batch(SQL) and realtime(streaming)
2. fast development using improved and developer frendly API using language like Scala.
3. RDD - resilient distributed dataset
Immutable(about value) - read-only, so parallelisation free and cacheable
- every transformation result new RDDs
- transformation should not give side effects
Lazy evaluation -
Type inferred
thanooj@thanooj-Inspiron-3521:~/input$ hadoop fs -mkdir /input/
thanooj@thanooj-Inspiron-3521:~/input$ hadoop fs -put /home/thanooj/input/textme.txt /input/
thanooj@thanooj-Inspiron-3521:~/input$ hadoop fs -ls /input/
Found 1 items
-rw-r--r-- 1 thanooj supergroup 30 2017-02-13 01:26 /input/textme.txt
thanooj@thanooj-Inspiron-3521:~/input$ hadoop fs -cat /input/textme.txt
this is a test
this is a test
thanooj@thanooj-Inspiron-3521:~/input$
thanooj@thanooj-Inspiron-3521:~/input$ hadoop fs -rm /input/textme.txt
17/02/13 01:46:36 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /input/textme.txt
thanooj@thanooj-Inspiron-3521:~/input$ hadoop fs -put /home/thanooj/input/textme.txt /input/
thanooj@thanooj-Inspiron-3521:~/input$ hadoop fs -gedit /input/textme.txt
-gedit: Unknown command
thanooj@thanooj-Inspiron-3521:~/input$ hadoop fs -vi /input/textme.txt
-vi: Unknown command
thanooj@thanooj-Inspiron-3521:~/input$ hadoop fs -rm -r /output/
17/02/13 01:54:36 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /output
thanooj@thanooj-Inspiron-3521:~/input$
thanooj@thanooj-Inspiron-3521:~/input$ hadoop fs -ls /
Found 3 items
drwx-wx-wx - thanooj supergroup 0 2017-02-13 01:10 /home
drwxr-xr-x - thanooj supergroup 0 2017-02-13 01:46 /input
drwx-wx-wx - thanooj supergroup 0 2017-02-13 01:12 /tmp
thanooj@thanooj-Inspiron-3521:~/input$
thanooj@thanooj-Inspiron-3521:~$ spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/02/13 01:12:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/02/13 01:12:18 WARN util.Utils: Your hostname, thanooj-Inspiron-3521 resolves to a loopback address: 127.0.1.1; using 192.168.1.8 instead (on interface wlp8s0)
17/02/13 01:12:18 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/02/13 01:12:30 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://192.168.1.8:4040
Spark context available as 'sc' (master = local[*], app id = local-1486928539720).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val wordCount = sc.textFile("hdfs:/input/textme.txt").flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).collectAsMap()
wordCount: scala.collection.Map[String,Int] = Map(is -> 2, a -> 2, this -> 2, test -> 2)
scala> sc.textFile("hdfs:/input/textme.txt").flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).saveAsTextFile("hdfs:/outbox/")
scala> val textFile = sc.textFile("hdfs:/input/textme.txt")
textFile: org.apache.spark.rdd.RDD[String] = hdfs:/input/textme.txt MapPartitionsRDD[6] at textFile at <console>:24
scala> val wordSplits = textFile.flatMap(line => line.split(" "))
wordSplits: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[8] at flatMap at <console>:26
scala> val wordMap = wordSplits.map(word => (word,1))
wordMap: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[9] at map at <console>:28
scala> val wordCountTuple = wordMap.reduceByKey(_+_)
wordCountTuple: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[12] at reduceByKey at <console>:30
scala> val wordCountMap = wordCountTuple.collectAsMap()
wordCountMap: scala.collection.Map[String,Int] = Map(is -> 2, a -> 2, this -> 2, test -> 2)
scala> val wordCountMap = wordCountTuple.collect
wordCountMap: Array[(String, Int)] = Array((this,2), (is,2), (test,2), (a,2))
scala> sc.textFile("hdfs:/input/textme.txt").flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).saveAsTextFile("hdfs:/output/")
thanooj@thanooj-Inspiron-3521:~/input$ hadoop fs -ls /
Found 4 items
drwx-wx-wx - thanooj supergroup 0 2017-02-13 01:10 /home
drwxr-xr-x - thanooj supergroup 0 2017-02-13 01:46 /input
drwxr-xr-x - thanooj supergroup 0 2017-02-13 01:56 /output
drwx-wx-wx - thanooj supergroup 0 2017-02-13 01:12 /tmp
thanooj@thanooj-Inspiron-3521:~/input$ hadoop fs -ls /output/
Found 3 items
-rw-r--r-- 1 thanooj supergroup 0 2017-02-13 01:56 /output/_SUCCESS
-rw-r--r-- 1 thanooj supergroup 816 2017-02-13 01:56 /output/part-00000
-rw-r--r-- 1 thanooj supergroup 623 2017-02-13 01:56 /output/part-00001
thanooj@thanooj-Inspiron-3521:~/input$ hadoop fs -cat /output/part-00000
(placing,1)
(realtime(streaming),1)
(is,2)
(API,1)
(Benifits:,1)
(its,1)
(results,1)
(coding,1)
(general,1)
(evaluation,2)
(action,1)
(until,1)
(MLib,1)
(MapReduce,1)
(data,1)
(cacheable,1)
(using,6)
(reselient(lineage,1)
(Type,1)
(way).,1)
(graph,1)
(computation/execution,1)
(Immutable,1)
(new,2)
(reliability,1)
(parallelisation,1)
(value),1)
( ,2)
(Rich,1)
(improved,1)
(parallelism,1)
(RDD,4)
(from,1)
(developer,1)
(consistency,1)
(free,1)
(frames,,1)
(calling,1)
(2.,2)
(dataset,1)
(immutable,1)
(effects,1)
(fast,1)
(directed,1)
(uses,1)
(Apache,1)
(unstructured,1)
(Fault,1)
(Lazy,2)
(graphx,,1)
(language,1)
(,32)
(purpose,1)
(computations,2)
(Scala.,1)
(frendly,1)
(framework,2)
(inmemory,1)
(batch,1)
(engine,,1)
(acyclic,1)
(own,1)
(streaming,1)
(transfermations,1)
(so,1)
(Easy,1)
(Transformations,1)
thanooj@thanooj-Inspiron-3521:~/input$ hadoop fs -cat /output/part-00001
(batch(SQL),1)
(In-memory,1)
(Spark,3)
(recovery,,1)
(on,1)
((machine,1)
(having,1)
(Immutable(about,1)
(library,1)
(plat,1)
(read-only,,1)
(1.,2)
(scalability,1)
(in,3)
(result,1)
(resilient,1)
(development,1)
(side,1)
(-,10)
(doing,1)
(3.,2)
(for,4)
(HADOOP,1)
(should,1)
(support,1)
(distributed,1)
( ,1)
(Boiler,1)
(give,1)
(Recovery,1)
(+,4)
(it,,1)
(learning),,1)
(Optimized,1)
(not,1)
(every,1)
(inferred,1)
(nature,1)
(a,1)
(called.,1)
(Core,,1)
(underlying,1)
(processing,1)
(including,1)
(lineage,1)
(HDFS,2)
(does,1)
(transformation,2)
(by,2)
(like,1)
(RDDs,1)
(optimally,1)
(actions.,1)
(programming,1)
(and,4)
thanooj@thanooj-Inspiron-3521:~/input$
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment