Skip to content

Instantly share code, notes, and snippets.

@habedi
Last active June 17, 2022 07:40
Show Gist options
  • Save habedi/c0c9ba18ab57d7adf6d5a11671dd8920 to your computer and use it in GitHub Desktop.
Save habedi/c0c9ba18ab57d7adf6d5a11671dd8920 to your computer and use it in GitHub Desktop.
PySpark HelloWorld App + GraphFrames
Display the source blob
Display the rendered blob
Raw
{
"nbformat": 4,
"nbformat_minor": 0,
"metadata": {
"colab": {
"name": "PySpark HelloWorld App + GraphFrames",
"provenance": [],
"collapsed_sections": [],
"include_colab_link": true
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3"
}
},
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "view-in-github",
"colab_type": "text"
},
"source": [
"<a href=\"https://colab.research.google.com/gist/habedi/c0c9ba18ab57d7adf6d5a11671dd8920/pyspark-helloworld-app-graphframes.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "KB16uBsJDrxY"
},
"source": [
"### I used contents from these sources to create this Colab notebook: \r\n",
" 1. https://colab.research.google.com/github/asifahmed90/pyspark-ML-in-Colab/blob/master/PySpark_Regression_Analysis.ipynb\r\n",
" 2. https://gist.github.com/dvainrub/b6178dc0e976e56abe9caa9b72f73d4a\r\n",
" 3. https://towardsdatascience.com/graphframes-in-jupyter-a-practical-guide-9b3b346cebc5"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "LupFC2QBLB9P"
},
"source": [
"# **OUTCOME: having an enviornment to develop Spark apps in Python3**"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "sq8U3BtmhtRx"
},
"source": [
"## **Step 0: setting things up in Google Colab**\n",
"\n",
"First, we need to install all the dependencies in Colab environment like Apache `Spark 3 with Hadoop 2.7`, `Python3`, `Java 11` (and a helper Python package named `Findspark`). \n",
"\n",
"Please note that you might need to update Spark's version to a newer value if, after executing the code in the cell bellow, you get an error like `wget` can't find and download `spark-3.0.2-*`"
]
},
{
"cell_type": "code",
"metadata": {
"id": "lh5NCoc8fsSO",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "64d70487-2aec-4978-bbdd-d2ce27943e88"
},
"source": [
"!apt-get install openjdk-11-jdk-headless -qq > /dev/null\n",
"!wget -q https://bitbucket.org/habedi/datasets/raw/b6769c4664e7ff68b001e2f43bc517888cbe3642/spark/spark-3.0.2-bin-hadoop2.7.tgz\n",
"!tar xf spark-3.0.2-bin-hadoop2.7.tgz\n",
"!rm -rf spark-3.0.2-bin-hadoop2.7.tgz*\n",
"!pip -q install findspark pyspark graphframes"
],
"execution_count": 1,
"outputs": [
{
"output_type": "stream",
"text": [
"\u001b[K |████████████████████████████████| 212.3MB 72kB/s \n",
"\u001b[K |████████████████████████████████| 204kB 21.2MB/s \n",
"\u001b[K |████████████████████████████████| 163kB 53.8MB/s \n",
"\u001b[?25h Building wheel for pyspark (setup.py) ... \u001b[?25l\u001b[?25hdone\n"
],
"name": "stdout"
}
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ILheUROOhprv"
},
"source": [
"Now that you installed Spark and Java in Colab, it is time to set some environment variables. We need to set the values for `JAVA_HOME` and `SPARK_HOME` (and `HADOOP_HOME`), as shown below:"
]
},
{
"cell_type": "code",
"metadata": {
"id": "v1b8k_OVf2QF"
},
"source": [
"import os\n",
"\n",
"os.environ[\"JAVA_HOME\"] = \"/usr/lib/jvm/java-11-openjdk-amd64\"\n",
"os.environ[\"SPARK_HOME\"] = \"/content/spark-3.0.2-bin-hadoop2.7\"\n",
"os.environ[\"HADOOP_HOME\"] = os.environ[\"SPARK_HOME\"]\n",
"\n",
"os.environ[\"PYSPARK_DRIVER_PYTHON\"] = \"jupyter\"\n",
"os.environ[\"PYSPARK_DRIVER_PYTHON_OPTS\"] = \"notebook\"\n",
"os.environ[\"PYSPARK_SUBMIT_ARGS\"] = \"--packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 pyspark-shell\""
],
"execution_count": 2,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "1mjULZnTHVnU"
},
"source": [
"## **Step 1: downloading project's dataset**\r\n",
"Now let's download the project's dataset from Github. You can read the dataset for the course's project from `datasets/data/TDT4305_S2021`"
]
},
{
"cell_type": "code",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "eG6eGOe7Hcu_",
"outputId": "54347549-4187-43c1-9a3a-374466a05fc7"
},
"source": [
"!rm -rf datasets\r\n",
"!git clone --depth=1 -q https://github.com/habedi/datasets\r\n",
"!ls datasets/data/TDT4305_S2021"
],
"execution_count": 3,
"outputs": [
{
"output_type": "stream",
"text": [
" badges.csv.gz\t 'Description of the data.pdf' users.csv.gz\n",
" comments.csv.gz posts.csv.gz\n"
],
"name": "stdout"
}
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "KwrqMk3HiMiE"
},
"source": [
"## **Step 2: checking the Spark installation**\r\n",
"Run a local spark session to test your installation:"
]
},
{
"cell_type": "code",
"metadata": {
"id": "9_Uz1NL4gHFx"
},
"source": [
"import findspark\n",
"findspark.init()"
],
"execution_count": 4,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "48lL1JV3M87K"
},
"source": [
"## **Step 3: making a helper method for creating a SaprkContext variable**\r\n",
"You can use `init_spark` to create a new `SaprkContext variable` and use it"
]
},
{
"cell_type": "code",
"metadata": {
"id": "DKP2o0UyIvFZ"
},
"source": [
"from pyspark.sql import SparkSession\r\n",
"\r\n",
"def init_spark(app_name=\"HelloWorldApp\", execution_mode=\"local[*]\"):\r\n",
" spark = SparkSession.builder.master(execution_mode).appName(app_name).getOrCreate()\r\n",
" sc = spark.sparkContext\r\n",
" return spark, sc"
],
"execution_count": 5,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "A2HGl1xmN7pI"
},
"source": [
"## **Step 4: a HelloWorld Spark app**\r\n",
"\r\n",
"Our first Spark application; it takes a list of numbers and squares each element and returns the list of squared numbers"
]
},
{
"cell_type": "code",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "SnR5C9jhBHsB",
"outputId": "1eda2eff-1e7b-44d5-a1ee-81cfc278a5c4"
},
"source": [
"def main1():\r\n",
" _, sc = init_spark()\r\n",
" nums = sc.parallelize([1, 2, 3, 4])\r\n",
" print(nums.map(lambda x: x*x).collect())\r\n",
"\r\n",
"if __name__ == '__main__':\r\n",
" main1()"
],
"execution_count": 6,
"outputs": [
{
"output_type": "stream",
"text": [
"[1, 4, 9, 16]\n"
],
"name": "stdout"
}
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "QHA_COdYOKnO"
},
"source": [
"## **Step 5: another Saprk app that loades a CSV files into an RDD**\r\n",
"Another simple app that prints the first two lines of from `users.csv.gz`"
]
},
{
"cell_type": "code",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "wlzNiahyCH9w",
"outputId": "da98d618-d850-4121-9113-d7b47a05f9cc"
},
"source": [
"def main2():\r\n",
" _, sc = init_spark()\r\n",
" lines = sc.textFile('datasets/data/TDT4305_S2021/users.csv.gz')\r\n",
" print(lines.take(2))\r\n",
"\r\n",
"if __name__ == '__main__':\r\n",
" main2()"
],
"execution_count": 7,
"outputs": [
{
"output_type": "stream",
"text": [
"['\"Id\"\\t\"Reputation\"\\t\"CreationDate\"\\t\"DisplayName\"\\t\"LastAccessDate\"\\t\"AboutMe\"\\t\"Views\"\\t\"UpVotes\"\\t\"DownVotes\"', \"-1\\t1\\t2014-05-13 21:29:22\\tCommunity\\t2014-05-13 21:29:22\\t<p>Hi, I'm not really a person.</p>&#xA;&#xA;<p>I'm a background process that helps keep this site clean!</p>&#xA;&#xA;<p>I do things like</p>&#xA;&#xA;<ul>&#xA;<li>Randomly poke old unanswered questions every hour so they get some attention</li>&#xA;<li>Own community questions and answers so nobody\\t3\\t819\\t1575\"]\n"
],
"name": "stdout"
}
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "fAEwctyiTkju"
},
"source": [
"## **Step 6: sample GraphFrames code**"
]
},
{
"cell_type": "code",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "sIRkcwMHTvyH",
"outputId": "db5c7202-47bf-4583-e594-b3eaff28d8e4"
},
"source": [
"from pyspark.sql import SparkSession\r\n",
"from pyspark.sql import SQLContext\r\n",
"\r\n",
"_, sc = init_spark()\r\n",
"sqlContext = SQLContext(sc)\r\n",
"\r\n",
"## the rest of this code (down below) comes from: https://graphframes.github.io/graphframes/docs/_site/quick-start.html#getting-started-with-apache-spark-and-spark-packages\r\n",
"\r\n",
"# Create a Vertex DataFrame with unique ID column \"id\"\r\n",
"v = sqlContext.createDataFrame([\r\n",
" (\"a\", \"Alice\", 34),\r\n",
" (\"b\", \"Bob\", 36),\r\n",
" (\"c\", \"Charlie\", 30),\r\n",
"], [\"id\", \"name\", \"age\"])\r\n",
"\r\n",
"# Create an Edge DataFrame with \"src\" and \"dst\" columns\r\n",
"e = sqlContext.createDataFrame([\r\n",
" (\"a\", \"b\", \"friend\"),\r\n",
" (\"b\", \"c\", \"follow\"),\r\n",
" (\"c\", \"b\", \"follow\"),\r\n",
"], [\"src\", \"dst\", \"relationship\"])\r\n",
"\r\n",
"# Create a GraphFrame\r\n",
"from graphframes import *\r\n",
"g = GraphFrame(v, e)\r\n",
"\r\n",
"# Query: Get in-degree of each vertex.\r\n",
"g.inDegrees.show()\r\n",
"\r\n",
"# Query: Count the number of \"follow\" connections in the graph.\r\n",
"g.edges.filter(\"relationship = 'follow'\").count()\r\n",
"\r\n",
"# Run PageRank algorithm, and show results.\r\n",
"#results = g.pageRank(resetProbability=0.01, maxIter=10)\r\n",
"#results.vertices.select(\"id\", \"pagerank\").show()"
],
"execution_count": 8,
"outputs": [
{
"output_type": "stream",
"text": [
"+---+--------+\n",
"| id|inDegree|\n",
"+---+--------+\n",
"| c| 1|\n",
"| b| 2|\n",
"+---+--------+\n",
"\n"
],
"name": "stdout"
},
{
"output_type": "execute_result",
"data": {
"text/plain": [
"2"
]
},
"metadata": {
"tags": []
},
"execution_count": 8
}
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "vgf8F1ElW6sz"
},
"source": [
"### See: https://towardsdatascience.com/graphframes-in-jupyter-a-practical-guide-9b3b346cebc5 for more examples"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "k3jnIXx0OOI2"
},
"source": [
"\r\n",
"## **Step 7 and beyond: create your apps down here (as many as you need)**"
]
},
{
"cell_type": "code",
"metadata": {
"id": "Y1BQDFJSJxG6"
},
"source": [
"## Add your code here"
],
"execution_count": 9,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"id": "6jubenTqLZim"
},
"source": [
"## Add your code here"
],
"execution_count": 10,
"outputs": []
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment