Skip to content

Instantly share code, notes, and snippets.

@vndee
Created September 9, 2021 16:49
Show Gist options
  • Save vndee/02428f84601cb28856fa31f266d05aca to your computer and use it in GitHub Desktop.
Save vndee/02428f84601cb28856fa31f266d05aca to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"source": [
"In the first step, we declare a `SparkSession` and get `SparkContext` which will be used in further action. `SparkConf` help us to define `SparkSession` configurations; we can use the internal hostname in the spark master URI: `spark://spark-master:7077` since we've deployed the spark cluster and jupyterlab in the same docker virtual network."
],
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 10,
"source": [
"from pyspark.sql import SparkSession\n",
"from pyspark.conf import SparkConf\n",
"\n",
"conf = SparkConf()\\\n",
" .setMaster(\"spark://spark-master:7077\")\\\n",
" .setAppName(\"ParallelWordCounting\")\\\n",
" .setExecutorEnv(\"spark.executor.memory\", \"1024m\")\\\n",
" .setExecutorEnv(\"spark.driver.memory\", \"1024m\")\n",
"\n",
"spark = SparkSession.builder.config(conf=conf).getOrCreate()\n",
"sc = spark.sparkContext"
],
"outputs": [],
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 11,
"source": [
"sc"
],
"outputs": [
{
"output_type": "execute_result",
"data": {
"text/plain": [
"<SparkContext master=spark://spark-master:7077 appName=ParallelWordCounting>"
],
"text/html": [
"\n",
" <div>\n",
" <p><b>SparkContext</b></p>\n",
"\n",
" <p><a href=\"http://90565f2072fc:4040\">Spark UI</a></p>\n",
"\n",
" <dl>\n",
" <dt>Version</dt>\n",
" <dd><code>v3.0.0</code></dd>\n",
" <dt>Master</dt>\n",
" <dd><code>spark://spark-master:7077</code></dd>\n",
" <dt>AppName</dt>\n",
" <dd><code>ParallelWordCounting</code></dd>\n",
" </dl>\n",
" </div>\n",
" "
]
},
"metadata": {},
"execution_count": 11
}
],
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"### Text processing\n",
"\n",
"We are going to load a raw text file from tesseract dataset: https://github.com/tesseractocr/langdata_lstm/blob/master/vie/vie.training_text. This file contains 280627 lines of the Vietnamese language. Our goal is to get the most frequent word in this file."
],
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 55,
"source": [
"!wget https://raw.githubusercontent.com/tesseract-ocr/langdata_lstm/master/vie/vie.training_text"
],
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"--2021-09-09 15:55:20-- https://raw.githubusercontent.com/tesseract-ocr/langdata_lstm/master/vie/vie.training_text\n",
"Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...\n",
"Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.\n",
"HTTP request sent, awaiting response... 200 OK\n",
"Length: 21889986 (21M) [text/plain]\n",
"Saving to: ‘vie.training_text’\n",
"\n",
"vie.training_text 100%[===================>] 20.88M 1.60MB/s in 15s \n",
"\n",
"2021-09-09 15:55:38 (1.35 MB/s) - ‘vie.training_text’ saved [21889986/21889986]\n",
"\n"
]
}
],
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"Next, we declare a straightforward normalization step that converts all characters to lowercase and removes all punctuation."
],
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 12,
"source": [
"import re\n",
"from string import punctuation\n",
"\n",
"def normalize(x):\n",
" x = x.lower()\n",
" return re.sub(r\"[\" + punctuation + \"]\", \"\", x)"
],
"outputs": [],
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"We now implement a naive algorithm that iterates through all of the text lines and counting sequentially."
],
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 13,
"source": [
"def count_sequential():\n",
" d = dict()\n",
" with open(\"vie.training_text\", \"r\") as stream:\n",
" for line in stream.read().split():\n",
" line = normalize(line.strip())\n",
" line = line.split(\" \")\n",
" for w in line:\n",
" if w not in d:\n",
" d[w] = 1\n",
" else:\n",
" d[w] += 1\n",
"\n",
" d = {k: v for k, v in sorted(d.items(), reverse=True, key=lambda item: item[1])}\n",
" return list(d.items())[:20]"
],
"outputs": [],
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"The function below is the map-reduce algorithm using PySpark. First, we load `vie.training_text` to RDD using `sc.textFile()`. Second, each text line will be normalized and split into a list of words. Then, we create a pair `(word, 1)` for each word and aggregate them using `reduceByKey()` function. The last step is just sorting the output RDD."
],
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 14,
"source": [
"def count_mapreduce():\n",
" text_rdd = sc.textFile(\"vie.training_text\")\n",
" text_rdd = text_rdd.flatMap(lambda line: normalize(line.strip()).split(\" \"))\\\n",
" .map(lambda word: (word, 1))\\\n",
" .reduceByKey(lambda a, b: a + b)\\\n",
" .sortBy(lambda a: a[1], ascending=False).collect()\n",
" \n",
" return text_rdd[:20] # return top-20 frequent words"
],
"outputs": [],
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"It is time to running the above functions and see the result."
],
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 15,
"source": [
"import time\n",
"t0 = time.time()\n",
"res = count_sequential()\n",
"t1 = time.time()\n",
"print(f\"Sequential: {t1 - t0}s\\n{res}\")"
],
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Sequential: 5.693771600723267s\n",
"[('và', 73333), ('', 37385), ('các', 35065), ('với', 31060), ('để', 21425), ('của', 20510), ('nhà', 20413), ('liên', 18289), ('số', 17943), ('haritası', 16596), ('thông', 16207), ('không', 16096), ('bảo', 14864), ('giá', 14613), ('ı', 14471), ('văn', 13621), ('thường', 13317), ('có', 12681), ('đã', 12602), ('bản', 12517)]\n"
]
}
],
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 18,
"source": [
"import time\n",
"t0 = time.time()\n",
"res = count_mapreduce()\n",
"t1 = time.time()\n",
"print(f\"MapReduce: {t1 - t0}s\\n{res}\")"
],
"outputs": [
{
"output_type": "stream",
"name": "stderr",
"text": [
" \r"
]
},
{
"output_type": "stream",
"name": "stdout",
"text": [
"MapReduce: 3.287429094314575s\n",
"[('và', 73333), ('', 37385), ('các', 35065), ('với', 31060), ('để', 21425), ('của', 20510), ('nhà', 20413), ('liên', 18289), ('số', 17943), ('haritası', 16596), ('thông', 16207), ('không', 16096), ('bảo', 14864), ('giá', 14613), ('ı', 14471), ('văn', 13621), ('thường', 13317), ('có', 12681), ('đã', 12602), ('bản', 12517)]\n"
]
}
],
"metadata": {}
},
{
"cell_type": "code",
"execution_count": 8,
"source": [
"sc.stop()\n",
"spark.stop()"
],
"outputs": [],
"metadata": {}
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.2"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment