Created
November 4, 2020 05:58
-
-
Save thanoojgithub/cbc5eb8d781a2a2898cc07250b24d181 to your computer and use it in GitHub Desktop.
Spark SQL notes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
start-dfs.sh | |
start-yarn.sh | |
jps | |
sudo mkdir /tmp/spark-events | |
sudo chown hduser:hadoop -R tmp | |
hduser@thanoojubuntu-Inspiron-3521: start-master.sh | |
hduser@thanoojubuntu-Inspiron-3521: start-slave.sh spark://thanoojubuntu-Inspiron-3521:7077 | |
starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/spark-2.4.6-bin-hadoop2.7/logs/spark-hduser-org.apache.spark.deploy.worker.Worker-1-thanoojubuntu-Inspiron-3521.out | |
hduser@thanoojubuntu-Inspiron-3521:/tmp$ spark-shell --master spark://thanoojubuntu-Inspiron-3521:7077 | |
20/11/02 20:51:14 WARN util.Utils: Your hostname, thanoojubuntu-Inspiron-3521 resolves to a loopback address: 127.0.1.1; using 192.168.225.20 instead (on interface wlp8s0) | |
20/11/02 20:51:14 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address | |
Setting default log level to "WARN". | |
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). | |
Spark context Web UI available at http://192.168.225.20:4040 | |
Spark context available as 'sc' (master = spark://thanoojubuntu-Inspiron-3521:7077, app id = app-20201102205125-0004). | |
Spark session available as 'spark'. | |
Welcome to | |
____ __ | |
/ __/__ ___ _____/ /__ | |
_\ \/ _ \/ _ `/ __/ '_/ | |
/___/ .__/\_,_/_/ /_/\_\ version 2.4.6 | |
/_/ | |
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_272) | |
Type in expressions to have them evaluated. | |
Type :help for more information. | |
scala> import org.apache.spark.streaming._ | |
import org.apache.spark.streaming._ | |
scala> val ssc = new StreamingContext(sc, Seconds(3)) | |
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@7f219e84 | |
scala> val lines = ssc.socketTextStream("localhost", 9999) | |
lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@758e6acd | |
scala> val words = lines.flatMap(_.split(" ")) | |
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@4b682e71 | |
scala> val pairs = words.map(word => (word, 1)) | |
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@1828eff | |
scala> val wordCounts = pairs.reduceByKey(_ + _) | |
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@223095d3 | |
scala> ssc.start() | |
// need to try | |
scala> wordCounts.saveAsHadoopFiles(prefix, [suffix]) | |
-------------------------------------------------------------------------------------------- | |
After a context is defined, you have to do the following. | |
Define the input sources by creating input DStreams. | |
Define the streaming computations by applying transformation and output operations to DStreams. | |
Start receiving data and processing it using streamingContext.start(). | |
Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination(). | |
The processing can be manually stopped using streamingContext.stop(). | |
Points to remember: | |
Once a context has been started, no new streaming computations can be set up or added to it. | |
Once a context has been stopped, it cannot be restarted. | |
Only one StreamingContext can be active in a JVM at the same time. | |
stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of stop() called stopSparkContext to false. | |
A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created. | |
--------------------------------------------------------------------------------------------- | |
duser@thanoojubuntu-Inspiron-3521:~$ cat account.json | |
{"id": "100","name": "ram","localtion": "Ayodhya"} | |
{"id": "101","name": "sita","localtion": "Midhila"} | |
{"id": "102","name": "lakshman","localtion": "Ayodhya"} | |
hduser@thanoojubuntu-Inspiron-3521:~$ hdfs dfs -put account.json jsoninputs/jsonOne/ | |
hduser@thanoojubuntu-Inspiron-3521:~$ hdfs dfs -ls jsoninputs/jsonOne | |
Found 1 items | |
-rw-r--r-- 1 hduser hadoop 159 2020-11-04 10:54 jsoninputs/jsonOne/account.json | |
hduser@thanoojubuntu-Inspiron-3521:~$ | |
scala> val fileDF = spark.read.json("hdfs://localhost:9000/user/hduser/jsoninputs/jsonOne/account.json") | |
fileDF: org.apache.spark.sql.DataFrame = [id: string, localtion: string ... 1 more field] | |
scala> fileDF.show() | |
+---+---------+--------+ | |
| id|localtion| name| | |
+---+---------+--------+ | |
|100| Ayodhya| ram| | |
|101| Midhila| sita| | |
|102| Ayodhya|lakshman| | |
+---+---------+--------+ | |
scala> fileDF.select($"name", $"localtion").show() | |
+--------+---------+ | |
| name|localtion| | |
+--------+---------+ | |
| ram| Ayodhya| | |
| sita| Midhila| | |
|lakshman| Ayodhya| | |
+--------+---------+ | |
scala> fileDF.printSchema() | |
root | |
|-- id: string (nullable = true) | |
|-- localtion: string (nullable = true) | |
|-- name: string (nullable = true) | |
scala> import spark.implicits._ | |
import spark.implicits._ | |
scala> fileDF.select($"name", $"localtion").show() | |
+--------+---------+ | |
| name|localtion| | |
+--------+---------+ | |
| ram| Ayodhya| | |
| sita| Midhila| | |
|lakshman| Ayodhya| | |
+--------+---------+ | |
scala> fileDF.select("name").show() | |
+--------+ | |
| name| | |
+--------+ | |
| ram| | |
| sita| | |
|lakshman| | |
+--------+ | |
scala> fileDF.groupBy("localtion").count().show() | |
+---------+-----+ | |
|localtion|count| | |
+---------+-----+ | |
| Ayodhya| 2| | |
| Midhila| 1| | |
+---------+-----+ | |
scala> val sqlDF = spark.sql("SELECT * FROM accounts") | |
sqlDF: org.apache.spark.sql.DataFrame = [id: string, localtion: string ... 1 more field] | |
scala> sqlDF.show() | |
+---+---------+--------+ | |
| id|localtion| name| | |
+---+---------+--------+ | |
|100| Ayodhya| ram| | |
|101| Midhila| sita| | |
|102| Ayodhya|lakshman| | |
+---+---------+--------+ | |
scala> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment