Skip to content

Instantly share code, notes, and snippets.

@thanoojgithub
Created November 4, 2020 05:58
Show Gist options
  • Save thanoojgithub/cbc5eb8d781a2a2898cc07250b24d181 to your computer and use it in GitHub Desktop.
Save thanoojgithub/cbc5eb8d781a2a2898cc07250b24d181 to your computer and use it in GitHub Desktop.
Spark SQL notes
start-dfs.sh
start-yarn.sh
jps
sudo mkdir /tmp/spark-events
sudo chown hduser:hadoop -R tmp
hduser@thanoojubuntu-Inspiron-3521: start-master.sh
hduser@thanoojubuntu-Inspiron-3521: start-slave.sh spark://thanoojubuntu-Inspiron-3521:7077
starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/spark-2.4.6-bin-hadoop2.7/logs/spark-hduser-org.apache.spark.deploy.worker.Worker-1-thanoojubuntu-Inspiron-3521.out
hduser@thanoojubuntu-Inspiron-3521:/tmp$ spark-shell --master spark://thanoojubuntu-Inspiron-3521:7077
20/11/02 20:51:14 WARN util.Utils: Your hostname, thanoojubuntu-Inspiron-3521 resolves to a loopback address: 127.0.1.1; using 192.168.225.20 instead (on interface wlp8s0)
20/11/02 20:51:14 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.225.20:4040
Spark context available as 'sc' (master = spark://thanoojubuntu-Inspiron-3521:7077, app id = app-20201102205125-0004).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.6
/_/
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_272)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._
scala> val ssc = new StreamingContext(sc, Seconds(3))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@7f219e84
scala> val lines = ssc.socketTextStream("localhost", 9999)
lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@758e6acd
scala> val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@4b682e71
scala> val pairs = words.map(word => (word, 1))
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@1828eff
scala> val wordCounts = pairs.reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@223095d3
scala> ssc.start()
// need to try
scala> wordCounts.saveAsHadoopFiles(prefix, [suffix])
--------------------------------------------------------------------------------------------
After a context is defined, you have to do the following.
Define the input sources by creating input DStreams.
Define the streaming computations by applying transformation and output operations to DStreams.
Start receiving data and processing it using streamingContext.start().
Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
The processing can be manually stopped using streamingContext.stop().
Points to remember:
Once a context has been started, no new streaming computations can be set up or added to it.
Once a context has been stopped, it cannot be restarted.
Only one StreamingContext can be active in a JVM at the same time.
stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of stop() called stopSparkContext to false.
A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.
---------------------------------------------------------------------------------------------
duser@thanoojubuntu-Inspiron-3521:~$ cat account.json
{"id": "100","name": "ram","localtion": "Ayodhya"}
{"id": "101","name": "sita","localtion": "Midhila"}
{"id": "102","name": "lakshman","localtion": "Ayodhya"}
hduser@thanoojubuntu-Inspiron-3521:~$ hdfs dfs -put account.json jsoninputs/jsonOne/
hduser@thanoojubuntu-Inspiron-3521:~$ hdfs dfs -ls jsoninputs/jsonOne
Found 1 items
-rw-r--r-- 1 hduser hadoop 159 2020-11-04 10:54 jsoninputs/jsonOne/account.json
hduser@thanoojubuntu-Inspiron-3521:~$
scala> val fileDF = spark.read.json("hdfs://localhost:9000/user/hduser/jsoninputs/jsonOne/account.json")
fileDF: org.apache.spark.sql.DataFrame = [id: string, localtion: string ... 1 more field]
scala> fileDF.show()
+---+---------+--------+
| id|localtion| name|
+---+---------+--------+
|100| Ayodhya| ram|
|101| Midhila| sita|
|102| Ayodhya|lakshman|
+---+---------+--------+
scala> fileDF.select($"name", $"localtion").show()
+--------+---------+
| name|localtion|
+--------+---------+
| ram| Ayodhya|
| sita| Midhila|
|lakshman| Ayodhya|
+--------+---------+
scala> fileDF.printSchema()
root
|-- id: string (nullable = true)
|-- localtion: string (nullable = true)
|-- name: string (nullable = true)
scala> import spark.implicits._
import spark.implicits._
scala> fileDF.select($"name", $"localtion").show()
+--------+---------+
| name|localtion|
+--------+---------+
| ram| Ayodhya|
| sita| Midhila|
|lakshman| Ayodhya|
+--------+---------+
scala> fileDF.select("name").show()
+--------+
| name|
+--------+
| ram|
| sita|
|lakshman|
+--------+
scala> fileDF.groupBy("localtion").count().show()
+---------+-----+
|localtion|count|
+---------+-----+
| Ayodhya| 2|
| Midhila| 1|
+---------+-----+
scala> val sqlDF = spark.sql("SELECT * FROM accounts")
sqlDF: org.apache.spark.sql.DataFrame = [id: string, localtion: string ... 1 more field]
scala> sqlDF.show()
+---+---------+--------+
| id|localtion| name|
+---+---------+--------+
|100| Ayodhya| ram|
|101| Midhila| sita|
|102| Ayodhya|lakshman|
+---+---------+--------+
scala>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment