Last active
June 17, 2022 07:40
-
-
Save habedi/c0c9ba18ab57d7adf6d5a11671dd8920 to your computer and use it in GitHub Desktop.
PySpark HelloWorld App + GraphFrames
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"nbformat": 4, | |
"nbformat_minor": 0, | |
"metadata": { | |
"colab": { | |
"name": "PySpark HelloWorld App + GraphFrames", | |
"provenance": [], | |
"collapsed_sections": [], | |
"include_colab_link": true | |
}, | |
"kernelspec": { | |
"name": "python3", | |
"display_name": "Python 3" | |
} | |
}, | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"id": "view-in-github", | |
"colab_type": "text" | |
}, | |
"source": [ | |
"<a href=\"https://colab.research.google.com/gist/habedi/c0c9ba18ab57d7adf6d5a11671dd8920/pyspark-helloworld-app-graphframes.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"id": "KB16uBsJDrxY" | |
}, | |
"source": [ | |
"### I used contents from these sources to create this Colab notebook: \r\n", | |
" 1. https://colab.research.google.com/github/asifahmed90/pyspark-ML-in-Colab/blob/master/PySpark_Regression_Analysis.ipynb\r\n", | |
" 2. https://gist.github.com/dvainrub/b6178dc0e976e56abe9caa9b72f73d4a\r\n", | |
" 3. https://towardsdatascience.com/graphframes-in-jupyter-a-practical-guide-9b3b346cebc5" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"id": "LupFC2QBLB9P" | |
}, | |
"source": [ | |
"# **OUTCOME: having an enviornment to develop Spark apps in Python3**" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"id": "sq8U3BtmhtRx" | |
}, | |
"source": [ | |
"## **Step 0: setting things up in Google Colab**\n", | |
"\n", | |
"First, we need to install all the dependencies in Colab environment like Apache `Spark 3 with Hadoop 2.7`, `Python3`, `Java 11` (and a helper Python package named `Findspark`). \n", | |
"\n", | |
"Please note that you might need to update Spark's version to a newer value if, after executing the code in the cell bellow, you get an error like `wget` can't find and download `spark-3.0.2-*`" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"metadata": { | |
"id": "lh5NCoc8fsSO", | |
"colab": { | |
"base_uri": "https://localhost:8080/" | |
}, | |
"outputId": "64d70487-2aec-4978-bbdd-d2ce27943e88" | |
}, | |
"source": [ | |
"!apt-get install openjdk-11-jdk-headless -qq > /dev/null\n", | |
"!wget -q https://bitbucket.org/habedi/datasets/raw/b6769c4664e7ff68b001e2f43bc517888cbe3642/spark/spark-3.0.2-bin-hadoop2.7.tgz\n", | |
"!tar xf spark-3.0.2-bin-hadoop2.7.tgz\n", | |
"!rm -rf spark-3.0.2-bin-hadoop2.7.tgz*\n", | |
"!pip -q install findspark pyspark graphframes" | |
], | |
"execution_count": 1, | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"text": [ | |
"\u001b[K |████████████████████████████████| 212.3MB 72kB/s \n", | |
"\u001b[K |████████████████████████████████| 204kB 21.2MB/s \n", | |
"\u001b[K |████████████████████████████████| 163kB 53.8MB/s \n", | |
"\u001b[?25h Building wheel for pyspark (setup.py) ... \u001b[?25l\u001b[?25hdone\n" | |
], | |
"name": "stdout" | |
} | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"id": "ILheUROOhprv" | |
}, | |
"source": [ | |
"Now that you installed Spark and Java in Colab, it is time to set some environment variables. We need to set the values for `JAVA_HOME` and `SPARK_HOME` (and `HADOOP_HOME`), as shown below:" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"metadata": { | |
"id": "v1b8k_OVf2QF" | |
}, | |
"source": [ | |
"import os\n", | |
"\n", | |
"os.environ[\"JAVA_HOME\"] = \"/usr/lib/jvm/java-11-openjdk-amd64\"\n", | |
"os.environ[\"SPARK_HOME\"] = \"/content/spark-3.0.2-bin-hadoop2.7\"\n", | |
"os.environ[\"HADOOP_HOME\"] = os.environ[\"SPARK_HOME\"]\n", | |
"\n", | |
"os.environ[\"PYSPARK_DRIVER_PYTHON\"] = \"jupyter\"\n", | |
"os.environ[\"PYSPARK_DRIVER_PYTHON_OPTS\"] = \"notebook\"\n", | |
"os.environ[\"PYSPARK_SUBMIT_ARGS\"] = \"--packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 pyspark-shell\"" | |
], | |
"execution_count": 2, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"id": "1mjULZnTHVnU" | |
}, | |
"source": [ | |
"## **Step 1: downloading project's dataset**\r\n", | |
"Now let's download the project's dataset from Github. You can read the dataset for the course's project from `datasets/data/TDT4305_S2021`" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"metadata": { | |
"colab": { | |
"base_uri": "https://localhost:8080/" | |
}, | |
"id": "eG6eGOe7Hcu_", | |
"outputId": "54347549-4187-43c1-9a3a-374466a05fc7" | |
}, | |
"source": [ | |
"!rm -rf datasets\r\n", | |
"!git clone --depth=1 -q https://github.com/habedi/datasets\r\n", | |
"!ls datasets/data/TDT4305_S2021" | |
], | |
"execution_count": 3, | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"text": [ | |
" badges.csv.gz\t 'Description of the data.pdf' users.csv.gz\n", | |
" comments.csv.gz posts.csv.gz\n" | |
], | |
"name": "stdout" | |
} | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"id": "KwrqMk3HiMiE" | |
}, | |
"source": [ | |
"## **Step 2: checking the Spark installation**\r\n", | |
"Run a local spark session to test your installation:" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"metadata": { | |
"id": "9_Uz1NL4gHFx" | |
}, | |
"source": [ | |
"import findspark\n", | |
"findspark.init()" | |
], | |
"execution_count": 4, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"id": "48lL1JV3M87K" | |
}, | |
"source": [ | |
"## **Step 3: making a helper method for creating a SaprkContext variable**\r\n", | |
"You can use `init_spark` to create a new `SaprkContext variable` and use it" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"metadata": { | |
"id": "DKP2o0UyIvFZ" | |
}, | |
"source": [ | |
"from pyspark.sql import SparkSession\r\n", | |
"\r\n", | |
"def init_spark(app_name=\"HelloWorldApp\", execution_mode=\"local[*]\"):\r\n", | |
" spark = SparkSession.builder.master(execution_mode).appName(app_name).getOrCreate()\r\n", | |
" sc = spark.sparkContext\r\n", | |
" return spark, sc" | |
], | |
"execution_count": 5, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"id": "A2HGl1xmN7pI" | |
}, | |
"source": [ | |
"## **Step 4: a HelloWorld Spark app**\r\n", | |
"\r\n", | |
"Our first Spark application; it takes a list of numbers and squares each element and returns the list of squared numbers" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"metadata": { | |
"colab": { | |
"base_uri": "https://localhost:8080/" | |
}, | |
"id": "SnR5C9jhBHsB", | |
"outputId": "1eda2eff-1e7b-44d5-a1ee-81cfc278a5c4" | |
}, | |
"source": [ | |
"def main1():\r\n", | |
" _, sc = init_spark()\r\n", | |
" nums = sc.parallelize([1, 2, 3, 4])\r\n", | |
" print(nums.map(lambda x: x*x).collect())\r\n", | |
"\r\n", | |
"if __name__ == '__main__':\r\n", | |
" main1()" | |
], | |
"execution_count": 6, | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"text": [ | |
"[1, 4, 9, 16]\n" | |
], | |
"name": "stdout" | |
} | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"id": "QHA_COdYOKnO" | |
}, | |
"source": [ | |
"## **Step 5: another Saprk app that loades a CSV files into an RDD**\r\n", | |
"Another simple app that prints the first two lines of from `users.csv.gz`" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"metadata": { | |
"colab": { | |
"base_uri": "https://localhost:8080/" | |
}, | |
"id": "wlzNiahyCH9w", | |
"outputId": "da98d618-d850-4121-9113-d7b47a05f9cc" | |
}, | |
"source": [ | |
"def main2():\r\n", | |
" _, sc = init_spark()\r\n", | |
" lines = sc.textFile('datasets/data/TDT4305_S2021/users.csv.gz')\r\n", | |
" print(lines.take(2))\r\n", | |
"\r\n", | |
"if __name__ == '__main__':\r\n", | |
" main2()" | |
], | |
"execution_count": 7, | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"text": [ | |
"['\"Id\"\\t\"Reputation\"\\t\"CreationDate\"\\t\"DisplayName\"\\t\"LastAccessDate\"\\t\"AboutMe\"\\t\"Views\"\\t\"UpVotes\"\\t\"DownVotes\"', \"-1\\t1\\t2014-05-13 21:29:22\\tCommunity\\t2014-05-13 21:29:22\\t<p>Hi, I'm not really a person.</p>

<p>I'm a background process that helps keep this site clean!</p>

<p>I do things like</p>

<ul>
<li>Randomly poke old unanswered questions every hour so they get some attention</li>
<li>Own community questions and answers so nobody\\t3\\t819\\t1575\"]\n" | |
], | |
"name": "stdout" | |
} | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"id": "fAEwctyiTkju" | |
}, | |
"source": [ | |
"## **Step 6: sample GraphFrames code**" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"metadata": { | |
"colab": { | |
"base_uri": "https://localhost:8080/" | |
}, | |
"id": "sIRkcwMHTvyH", | |
"outputId": "db5c7202-47bf-4583-e594-b3eaff28d8e4" | |
}, | |
"source": [ | |
"from pyspark.sql import SparkSession\r\n", | |
"from pyspark.sql import SQLContext\r\n", | |
"\r\n", | |
"_, sc = init_spark()\r\n", | |
"sqlContext = SQLContext(sc)\r\n", | |
"\r\n", | |
"## the rest of this code (down below) comes from: https://graphframes.github.io/graphframes/docs/_site/quick-start.html#getting-started-with-apache-spark-and-spark-packages\r\n", | |
"\r\n", | |
"# Create a Vertex DataFrame with unique ID column \"id\"\r\n", | |
"v = sqlContext.createDataFrame([\r\n", | |
" (\"a\", \"Alice\", 34),\r\n", | |
" (\"b\", \"Bob\", 36),\r\n", | |
" (\"c\", \"Charlie\", 30),\r\n", | |
"], [\"id\", \"name\", \"age\"])\r\n", | |
"\r\n", | |
"# Create an Edge DataFrame with \"src\" and \"dst\" columns\r\n", | |
"e = sqlContext.createDataFrame([\r\n", | |
" (\"a\", \"b\", \"friend\"),\r\n", | |
" (\"b\", \"c\", \"follow\"),\r\n", | |
" (\"c\", \"b\", \"follow\"),\r\n", | |
"], [\"src\", \"dst\", \"relationship\"])\r\n", | |
"\r\n", | |
"# Create a GraphFrame\r\n", | |
"from graphframes import *\r\n", | |
"g = GraphFrame(v, e)\r\n", | |
"\r\n", | |
"# Query: Get in-degree of each vertex.\r\n", | |
"g.inDegrees.show()\r\n", | |
"\r\n", | |
"# Query: Count the number of \"follow\" connections in the graph.\r\n", | |
"g.edges.filter(\"relationship = 'follow'\").count()\r\n", | |
"\r\n", | |
"# Run PageRank algorithm, and show results.\r\n", | |
"#results = g.pageRank(resetProbability=0.01, maxIter=10)\r\n", | |
"#results.vertices.select(\"id\", \"pagerank\").show()" | |
], | |
"execution_count": 8, | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"text": [ | |
"+---+--------+\n", | |
"| id|inDegree|\n", | |
"+---+--------+\n", | |
"| c| 1|\n", | |
"| b| 2|\n", | |
"+---+--------+\n", | |
"\n" | |
], | |
"name": "stdout" | |
}, | |
{ | |
"output_type": "execute_result", | |
"data": { | |
"text/plain": [ | |
"2" | |
] | |
}, | |
"metadata": { | |
"tags": [] | |
}, | |
"execution_count": 8 | |
} | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"id": "vgf8F1ElW6sz" | |
}, | |
"source": [ | |
"### See: https://towardsdatascience.com/graphframes-in-jupyter-a-practical-guide-9b3b346cebc5 for more examples" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"id": "k3jnIXx0OOI2" | |
}, | |
"source": [ | |
"\r\n", | |
"## **Step 7 and beyond: create your apps down here (as many as you need)**" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"metadata": { | |
"id": "Y1BQDFJSJxG6" | |
}, | |
"source": [ | |
"## Add your code here" | |
], | |
"execution_count": 9, | |
"outputs": [] | |
}, | |
{ | |
"cell_type": "code", | |
"metadata": { | |
"id": "6jubenTqLZim" | |
}, | |
"source": [ | |
"## Add your code here" | |
], | |
"execution_count": 10, | |
"outputs": [] | |
} | |
] | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment