Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save mikeapted/1eec73f44cd29e71f1d10acfad0322e9 to your computer and use it in GitHub Desktop.
Save mikeapted/1eec73f44cd29e71f1d10acfad0322e9 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Serverless Data Lake Immersion\n",
"## Lab 2.3 - Advanced Data Preparation with Developer Endpoints and Notebook\n",
"(Revision History: \n",
"PA2, 2018-12-13, @akirmak \n",
"PA1, 2018-12-07\n",
"\n",
"This example shows how to do joins and filters with transforms on DynamicFrames.\n",
"\n",
"For purposes of our Immersion Day, we are assuming that you have done the previous Lab assignments (Create Firehose delivery stream, ingest simulated product catalogue data to S3, crawled this data and put the results into a database called `<your initials>-tame-bda-immersion-gdb` and a table called `raw` in your Data Catalog, as described in the lab guide.\n",
"\n",
"### 2. Getting started\n",
"\n",
"DataFrames APIs support elaborate methods for slicing-and-dicing the data. It includes operations such as \"selecting\" rows, columns, and cells by name or by number, filtering out rows, etc. Statistical data is usually very messy and contains lots of missing and incorrect values and range violations. So a critically important feature of DataFrames is the explicit management of missing data.\n",
"\n",
"We will write a script that:\n",
"\n",
"1. ...\n",
" ...\n",
"2. ...\n",
"\n",
"Begin by running some boilerplate to import the AWS Glue libraries we'll need and set up a single `GlueContext`.\n",
"Then, start a Spark application and create dynamic frame from our the data in S3. \n",
"\n",
"Some concepts:\n",
"\n",
"- DataFrames are designed to process a large collection of structured as well as semi-structured data.\n",
"\n",
"- ...\n",
"\n",
"**Important** Before running the next step, update the database name with your initials (e.g. fs-tame-bda-immersion-gdb for Frank Sinatra)"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"import sys\n",
"from awsglue.transforms import *\n",
"from awsglue.utils import getResolvedOptions\n",
"from pyspark.context import SparkContext\n",
"from awsglue.context import GlueContext\n",
"from awsglue.job import Job\n",
"\n",
"glueContext = GlueContext(SparkContext.getOrCreate())\n",
"\n",
"\n",
"spark = glueContext.spark_session\n",
"\n",
"datasource0 = glueContext.create_dynamic_frame.from_catalog(\n",
" database = \"<your initials>-tame-bda-immersion-gdb\", \n",
" table_name = \"raw\", \n",
" transformation_ctx = \"datasource0\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 3. Schema of the Dataset\n",
"Next, you can easily examine the schemas that the crawler recorded in the Data Catalog. For example, to see the schema of the `raw` table, run the following code:\n",
"\n",
"Note: To have a look at the schema, i.e. the structure of the DataFrame, we'll use the printSchema method. This will give us the different columns in our DataFrame, along with the data type and the nullable conditions for that particular column\n"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Count: 84500\n",
"root\n",
" |-- productName: string (nullable = true)\n",
" |-- color: string (nullable = true)\n",
" |-- department: string (nullable = true)\n",
" |-- product: string (nullable = true)\n",
" |-- imageUrl: string (nullable = true)\n",
" |-- dateSoldSince: string (nullable = true)\n",
" |-- dateSoldUntil: string (nullable = true)\n",
" |-- price: integer (nullable = true)\n",
" |-- campaign: string (nullable = true)\n",
" |-- year: string (nullable = true)\n",
" |-- month: string (nullable = true)\n",
" |-- day: string (nullable = true)\n",
" |-- hour: string (nullable = true)\n",
"\n",
"+--------------------+--------+----------+--------+--------------------+--------------------+--------------------+-----+-----------+----+-----+---+----+\n",
"| productName| color|department| product| imageUrl| dateSoldSince| dateSoldUntil|price| campaign|year|month|day|hour|\n",
"+--------------------+--------+----------+--------+--------------------+--------------------+--------------------+-----+-----------+----+-----+---+----+\n",
"|Incredible Granit...|sky blue| Health| Shirt|http://lorempixel...|Fri Aug 24 2018 2...|Sun Mar 03 2019 0...| 10| NONE|2018| 12| 12| 22|\n",
"|Fantastic Frozen ...| purple| Books| Towels|http://lorempixel...|Sun Mar 04 2018 2...|Mon Oct 14 2019 2...| 75|BlackFriday|2018| 12| 12| 22|\n",
"|Intelligent Concr...| green|Automotive| Gloves|http://lorempixel...|Tue Nov 13 2018 0...|Sun Dec 01 2019 2...| 54| 10Percent|2018| 12| 12| 22|\n",
"|Tasty Concrete Ch...| pink| Movies|Keyboard|http://lorempixel...|Wed Jun 20 2018 0...|Fri Feb 22 2019 1...| 35| NONE|2018| 12| 12| 22|\n",
"|Fantastic Metal Ball| indigo| Clothing| Gloves|http://lorempixel...|Wed Aug 08 2018 0...|Wed Feb 20 2019 0...| 23| 10Percent|2018| 12| 12| 22|\n",
"+--------------------+--------+----------+--------+--------------------+--------------------+--------------------+-----+-----------+----+-----+---+----+\n",
"only showing top 5 rows"
]
}
],
"source": [
"print \"Count: \", datasource0.count()\n",
"\n",
"df = datasource0.toDF()\n",
"\n",
"df.printSchema()\n",
"\n",
"df.show(5)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 4. Selecting Multiple Columns & Filtering Data\n",
"We can filter our data based on multiple conditions "
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+--------+-----------+-----+-----------+\n",
"| productName| product| department|price| campaign|\n",
"+--------------------+--------+-----------+-----+-----------+\n",
"|Fantastic Frozen ...| Towels| Books| 75|BlackFriday|\n",
"|Unbranded Soft Ch...| Gloves| Games| 14|BlackFriday|\n",
"|Ergonomic Plastic...| Mouse| Jewelery| 149|BlackFriday|\n",
"|Practical Granite...|Keyboard| Grocery| 125|BlackFriday|\n",
"| Licensed Soft Fish| Salad| Garden| 32|BlackFriday|\n",
"| Rustic Frozen Salad|Computer| Tools| 50|BlackFriday|\n",
"|Refined Plastic C...| Bacon| Books| 137|BlackFriday|\n",
"|Intelligent Rubbe...| Cheese|Electronics| 86|BlackFriday|\n",
"|Rustic Plastic Shirt| Shoes| Outdoors| 53|BlackFriday|\n",
"|Handcrafted Fresh...| Towels| Clothing| 61|BlackFriday|\n",
"| Awesome Metal Pizza| Shirt| Baby| 28|BlackFriday|\n",
"|Intelligent Fresh...| Tuna| Grocery| 141|BlackFriday|\n",
"|Practical Rubber ...|Sausages| Music| 71|BlackFriday|\n",
"| Awesome Fresh Hat| Soap| Tools| 139|BlackFriday|\n",
"| Rustic Fresh Chair|Keyboard| Industrial| 15|BlackFriday|\n",
"|Incredible Rubber...| Gloves| Music| 44|BlackFriday|\n",
"|Licensed Fresh To...| Tuna| Baby| 89|BlackFriday|\n",
"|Practical Steel S...| Pizza| Home| 134|BlackFriday|\n",
"| Tasty Plastic Pants| Towels| Sports| 142|BlackFriday|\n",
"|Licensed Rubber P...| Towels| Automotive| 139|BlackFriday|\n",
"+--------------------+--------+-----------+-----+-----------+\n",
"only showing top 20 rows"
]
}
],
"source": [
"df.filter((df.campaign=='BlackFriday')).select('productName','product', 'department', 'price','campaign').show()\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 5. Perform transformations on data\n",
"\n",
"You can easily transform data.\n",
"\n",
"Let's only keep the fields that we want and rename `imageUrl` to `thumbnailImageUrl`. The dataset is small enough that we can look at the whole thing. The `toDF()` converts a DynamicFrame to a Spark DataFrame, so we can apply the\n",
"transforms in SparkSQL."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- productName: string (nullable = true)\n",
" |-- department: string (nullable = true)\n",
" |-- product: string (nullable = true)\n",
" |-- dateSoldSince: string (nullable = true)\n",
" |-- dateSoldUntil: string (nullable = true)\n",
" |-- price: integer (nullable = true)\n",
" |-- year: string (nullable = true)\n",
" |-- month: string (nullable = true)\n",
" |-- day: string (nullable = true)\n",
" |-- thumbnailImageUrl: string (nullable = true)\n",
" |-- campaignType: string (nullable = true)\n",
"\n",
"+--------------------+-----------+--------+--------------------+--------------------+-----+----+-----+---+--------------------+------------+\n",
"| productName| department| product| dateSoldSince| dateSoldUntil|price|year|month|day| thumbnailImageUrl|campaignType|\n",
"+--------------------+-----------+--------+--------------------+--------------------+-----+----+-----+---+--------------------+------------+\n",
"|Incredible Granit...| Health| Shirt|Fri Aug 24 2018 2...|Sun Mar 03 2019 0...| 10|2018| 12| 12|http://lorempixel...| NONE|\n",
"|Fantastic Frozen ...| Books| Towels|Sun Mar 04 2018 2...|Mon Oct 14 2019 2...| 75|2018| 12| 12|http://lorempixel...| BlackFriday|\n",
"|Intelligent Concr...| Automotive| Gloves|Tue Nov 13 2018 0...|Sun Dec 01 2019 2...| 54|2018| 12| 12|http://lorempixel...| 10Percent|\n",
"|Tasty Concrete Ch...| Movies|Keyboard|Wed Jun 20 2018 0...|Fri Feb 22 2019 1...| 35|2018| 12| 12|http://lorempixel...| NONE|\n",
"|Fantastic Metal Ball| Clothing| Gloves|Wed Aug 08 2018 0...|Wed Feb 20 2019 0...| 23|2018| 12| 12|http://lorempixel...| 10Percent|\n",
"|Practical Frozen ...| Games| Pants|Thu Mar 29 2018 0...|Sat Sep 14 2019 1...| 16|2018| 12| 12|http://lorempixel...| 10Percent|\n",
"|Unbranded Soft Ch...| Games| Gloves|Fri Nov 23 2018 1...|Wed Mar 27 2019 1...| 14|2018| 12| 12|http://lorempixel...| BlackFriday|\n",
"| Generic Soft Gloves| Computers| Chicken|Tue Jul 03 2018 2...|Sat Mar 02 2019 1...| 65|2018| 12| 12|http://lorempixel...| NONE|\n",
"|Ergonomic Plastic...| Jewelery| Mouse|Mon Jan 15 2018 1...|Sat Feb 23 2019 1...| 149|2018| 12| 12|http://lorempixel...| BlackFriday|\n",
"| Rustic Rubber Car|Electronics| Table|Thu Aug 23 2018 0...|Wed Jul 31 2019 0...| 26|2018| 12| 12|http://lorempixel...| 10Percent|\n",
"+--------------------+-----------+--------+--------------------+--------------------+-----+----+-----+---+--------------------+------------+\n",
"only showing top 10 rows"
]
}
],
"source": [
"dsTransformed = datasource0.drop_fields(['color','hour']).rename_field('imageUrl', 'thumbnailImageUrl').rename_field('campaign', 'campaignType')\n",
"dfTransformed = dsTransformed.toDF()\n",
"\n",
"dfTransformed.printSchema()\n",
"\n",
"dfTransformed.show(10)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 6. Export transformed data to S3\n",
"\n",
"Let's export the transformed dataset in the previous section to S3. Convert to Parquet format. The following call writes the table across multiple files to support fast parallel reads when doing analysis later:\n",
"\n",
"**Important** Before running the cell below, make sure you are using the correct S3 path. Make sure you update the S3 path name with your initials (e.g. s3://fs-tame-bda-immersion/output-etl-nb-jobs for Frank Sinatra)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"<awsglue.dynamicframe.DynamicFrame object at 0x7fd2ac0566d0>"
]
}
],
"source": [
"glueContext.write_dynamic_frame.from_options(frame = dsTransformed,\n",
" connection_type = \"s3\",\n",
" connection_options = {\"path\": \"s3://<your initials>-tame-bda-immersion/output-etl-nb-jobs\"},\n",
" format = \"parquet\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"When execution is finished, go the the S3 folder, and verify that the files are written. For instance: the folder should look something like:\n",
"\n",
"`\n",
"2018-12-07 22:42:56 87705 part-00000-3944ffa1-8917-42f0-93f2-bef5b3c63cca-c000.snappy.parquet\n",
"2018-12-07 22:41:55 87572 part-00000-48a202cd-86eb-4109-b3e6-f7f2bef549ef-c000.snappy.parquet\n",
"2018-11-21 01:32:34 87572 part-00000-7f23bfb7-7a9f-4eee-bd00-4cf7ab085f57-c000.snappy.parquet\n",
"2018-12-07 22:42:56 88180 part-00001-3944ffa1-8917-42f0-93f2-bef5b3c63cca-c000.snappy.parquet\n",
"2018-12-07 22:41:55 88180 part-00001-48a202cd-86eb-4109-b3e6-f7f2bef549ef-c000.snappy.parquet\n",
"2018-11-21 01:32:34 88180 part-00001-7f23bfb7-7a9f-4eee-bd00-4cf7ab085f57-c000.snappy.parquet\n",
"2018-12-07 22:42:56 87545 part-00002-3944ffa1-8917-42f0-93f2-bef5b3c63cca-c000.snappy.parquet\n",
"2018-12-07 22:41:55 87851 part-00002-48a202cd-86eb-4109-b3e6-f7f2bef549ef-c000.snappy.parquet\n",
"2018-11-21 01:32:34 87545 part-00002-7f23bfb7-7a9f-4eee-bd00-4cf7ab085f57-c000.snappy.parquet`"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 7. Repartition Data\n",
"**Important:** Before running the cell below, make sure you are using the correct S3 path.\n",
"\n",
"\n",
"In the previous example, the data was exported to multiple S3 objects in parquet format. Since the data is small, let's combine them in a single partition.\n",
"\n",
"#### 7.1 Combine into a Single Partition\n",
"To put all the history data into a single file, we need to convert it to a data frame, repartition it, and\n",
"write it out.\n",
"\n",
"**Important** Before running the cell below, make sure you are using the correct S3 path. Make sure you update the S3 path name with your initials (e.g. s3://fs-tame-bda-immersion/output-etl-nb-jobs for Frank Sinatra)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"dfSinglePartition = dfTransformed.repartition(1)\n",
"dfSinglePartition.write.parquet('s3://<your initials>-tame-bda-immersion/output-etl-nb-jobs/singlePartition')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"When execution is finished, go the the S3 folder, and verify that the files are written. For instance: the folder should look something like:\n",
"\n",
"`2018-12-07 22:55:13 1435146 part-00000-95ad4fb6-d178-47ad-8072-d60d8d8e71fd-c000.snappy.parquet`"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### 7.2 Repartition Based on a Field\n",
"\n",
"Or if you want to separate it by the `department`:\n",
"\n",
"\n",
"**Important** Before running the cell below, make sure you are using the correct S3 path. Make sure you update the S3 path name with your initials (e.g. s3://fs-tame-bda-immersion/output-etl-nb-jobs/byDepartment for Frank Sinatra)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"dfTransformed.write.parquet(\n",
" 's3://<your initials>-tame-bda-immersion/output-etl-nb-jobs/byDepartment', \n",
" partitionBy=['department'])\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Note:**\n",
"Many other types of transformations could be done, such as joining tables. AWS Glue makes it easy to write it to relational databases like Redshift even with semi-structured data. It offers a transform, relationalize(), that flattens DynamicFrames no matter how complex the objects in the frame may be."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 8. Putting it together\n",
"Great! We now have the final table that we'd like to use for analysis in S3, the storage layer of our Data Lake in a compact, efficient format for analytics, that we can run SQL over in AWS Glue, Athena, or Redshift Spectrum.\n",
" \n",
"Note that, many other types of transformations could be done (e.g. JOIN operations). We leave it to your imagination :) \n",
"\n",
"### 9. [Optional Exercise ]\n",
"Try to write a query yourself."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"# [Optional] Write your query here"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 9. Congratulations! \n",
"You've Finished this lab. \n",
"\n",
"** Very Important:** SageMaker Notebooks run on EC2, and therefore you will be billed by the second unless you save your work (by downloading to your local computer) & terminate the SageMaker notebook instance. \n",
"\n",
"### 10. Cleaning up resources \n",
"\n",
"Please \n",
" 1. download this notebook to your computer by selecting ` File -> Download as -> Notebook (.ipynb)`. \n",
" 1. Terminate this instance. Remember that you can always recreate it from the `AWS Glue Console` by selecting the terminated instance and `Cloning` its configuration.\n",
" \n",
" Thank you.\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Sparkmagic (PySpark)",
"language": "",
"name": "pysparkkernel"
},
"language_info": {
"codemirror_mode": {
"name": "python",
"version": 2
},
"mimetype": "text/x-python",
"name": "pyspark",
"pygments_lexer": "python2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment