Created
September 9, 2021 16:49
-
-
Save vndee/02428f84601cb28856fa31f266d05aca to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"In the first step, we declare a `SparkSession` and get `SparkContext` which will be used in further action. `SparkConf` help us to define `SparkSession` configurations; we can use the internal hostname in the spark master URI: `spark://spark-master:7077` since we've deployed the spark cluster and jupyterlab in the same docker virtual network." | |
], | |
"metadata": {} | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 10, | |
"source": [ | |
"from pyspark.sql import SparkSession\n", | |
"from pyspark.conf import SparkConf\n", | |
"\n", | |
"conf = SparkConf()\\\n", | |
" .setMaster(\"spark://spark-master:7077\")\\\n", | |
" .setAppName(\"ParallelWordCounting\")\\\n", | |
" .setExecutorEnv(\"spark.executor.memory\", \"1024m\")\\\n", | |
" .setExecutorEnv(\"spark.driver.memory\", \"1024m\")\n", | |
"\n", | |
"spark = SparkSession.builder.config(conf=conf).getOrCreate()\n", | |
"sc = spark.sparkContext" | |
], | |
"outputs": [], | |
"metadata": {} | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 11, | |
"source": [ | |
"sc" | |
], | |
"outputs": [ | |
{ | |
"output_type": "execute_result", | |
"data": { | |
"text/plain": [ | |
"<SparkContext master=spark://spark-master:7077 appName=ParallelWordCounting>" | |
], | |
"text/html": [ | |
"\n", | |
" <div>\n", | |
" <p><b>SparkContext</b></p>\n", | |
"\n", | |
" <p><a href=\"http://90565f2072fc:4040\">Spark UI</a></p>\n", | |
"\n", | |
" <dl>\n", | |
" <dt>Version</dt>\n", | |
" <dd><code>v3.0.0</code></dd>\n", | |
" <dt>Master</dt>\n", | |
" <dd><code>spark://spark-master:7077</code></dd>\n", | |
" <dt>AppName</dt>\n", | |
" <dd><code>ParallelWordCounting</code></dd>\n", | |
" </dl>\n", | |
" </div>\n", | |
" " | |
] | |
}, | |
"metadata": {}, | |
"execution_count": 11 | |
} | |
], | |
"metadata": {} | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"### Text processing\n", | |
"\n", | |
"We are going to load a raw text file from tesseract dataset: https://github.com/tesseractocr/langdata_lstm/blob/master/vie/vie.training_text. This file contains 280627 lines of the Vietnamese language. Our goal is to get the most frequent word in this file." | |
], | |
"metadata": {} | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 55, | |
"source": [ | |
"!wget https://raw.githubusercontent.com/tesseract-ocr/langdata_lstm/master/vie/vie.training_text" | |
], | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"name": "stdout", | |
"text": [ | |
"--2021-09-09 15:55:20-- https://raw.githubusercontent.com/tesseract-ocr/langdata_lstm/master/vie/vie.training_text\n", | |
"Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...\n", | |
"Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.\n", | |
"HTTP request sent, awaiting response... 200 OK\n", | |
"Length: 21889986 (21M) [text/plain]\n", | |
"Saving to: ‘vie.training_text’\n", | |
"\n", | |
"vie.training_text 100%[===================>] 20.88M 1.60MB/s in 15s \n", | |
"\n", | |
"2021-09-09 15:55:38 (1.35 MB/s) - ‘vie.training_text’ saved [21889986/21889986]\n", | |
"\n" | |
] | |
} | |
], | |
"metadata": {} | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"Next, we declare a straightforward normalization step that converts all characters to lowercase and removes all punctuation." | |
], | |
"metadata": {} | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 12, | |
"source": [ | |
"import re\n", | |
"from string import punctuation\n", | |
"\n", | |
"def normalize(x):\n", | |
" x = x.lower()\n", | |
" return re.sub(r\"[\" + punctuation + \"]\", \"\", x)" | |
], | |
"outputs": [], | |
"metadata": {} | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"We now implement a naive algorithm that iterates through all of the text lines and counting sequentially." | |
], | |
"metadata": {} | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 13, | |
"source": [ | |
"def count_sequential():\n", | |
" d = dict()\n", | |
" with open(\"vie.training_text\", \"r\") as stream:\n", | |
" for line in stream.read().split():\n", | |
" line = normalize(line.strip())\n", | |
" line = line.split(\" \")\n", | |
" for w in line:\n", | |
" if w not in d:\n", | |
" d[w] = 1\n", | |
" else:\n", | |
" d[w] += 1\n", | |
"\n", | |
" d = {k: v for k, v in sorted(d.items(), reverse=True, key=lambda item: item[1])}\n", | |
" return list(d.items())[:20]" | |
], | |
"outputs": [], | |
"metadata": {} | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"The function below is the map-reduce algorithm using PySpark. First, we load `vie.training_text` to RDD using `sc.textFile()`. Second, each text line will be normalized and split into a list of words. Then, we create a pair `(word, 1)` for each word and aggregate them using `reduceByKey()` function. The last step is just sorting the output RDD." | |
], | |
"metadata": {} | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 14, | |
"source": [ | |
"def count_mapreduce():\n", | |
" text_rdd = sc.textFile(\"vie.training_text\")\n", | |
" text_rdd = text_rdd.flatMap(lambda line: normalize(line.strip()).split(\" \"))\\\n", | |
" .map(lambda word: (word, 1))\\\n", | |
" .reduceByKey(lambda a, b: a + b)\\\n", | |
" .sortBy(lambda a: a[1], ascending=False).collect()\n", | |
" \n", | |
" return text_rdd[:20] # return top-20 frequent words" | |
], | |
"outputs": [], | |
"metadata": {} | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"It is time to running the above functions and see the result." | |
], | |
"metadata": {} | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 15, | |
"source": [ | |
"import time\n", | |
"t0 = time.time()\n", | |
"res = count_sequential()\n", | |
"t1 = time.time()\n", | |
"print(f\"Sequential: {t1 - t0}s\\n{res}\")" | |
], | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"name": "stdout", | |
"text": [ | |
"Sequential: 5.693771600723267s\n", | |
"[('và', 73333), ('', 37385), ('các', 35065), ('với', 31060), ('để', 21425), ('của', 20510), ('nhà', 20413), ('liên', 18289), ('số', 17943), ('haritası', 16596), ('thông', 16207), ('không', 16096), ('bảo', 14864), ('giá', 14613), ('ı', 14471), ('văn', 13621), ('thường', 13317), ('có', 12681), ('đã', 12602), ('bản', 12517)]\n" | |
] | |
} | |
], | |
"metadata": {} | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 18, | |
"source": [ | |
"import time\n", | |
"t0 = time.time()\n", | |
"res = count_mapreduce()\n", | |
"t1 = time.time()\n", | |
"print(f\"MapReduce: {t1 - t0}s\\n{res}\")" | |
], | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"name": "stderr", | |
"text": [ | |
" \r" | |
] | |
}, | |
{ | |
"output_type": "stream", | |
"name": "stdout", | |
"text": [ | |
"MapReduce: 3.287429094314575s\n", | |
"[('và', 73333), ('', 37385), ('các', 35065), ('với', 31060), ('để', 21425), ('của', 20510), ('nhà', 20413), ('liên', 18289), ('số', 17943), ('haritası', 16596), ('thông', 16207), ('không', 16096), ('bảo', 14864), ('giá', 14613), ('ı', 14471), ('văn', 13621), ('thường', 13317), ('có', 12681), ('đã', 12602), ('bản', 12517)]\n" | |
] | |
} | |
], | |
"metadata": {} | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 8, | |
"source": [ | |
"sc.stop()\n", | |
"spark.stop()" | |
], | |
"outputs": [], | |
"metadata": {} | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3 (ipykernel)", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.9.2" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 5 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment