This small subclass of the Pandas sqlalchemy-based SQL support for reading/storing tables uses the Postgres-specific "COPY FROM" method to insert large amounts of data to the database. It is much faster that using INSERT. To acheive this, the table is created in the normal way using sqlalchemy but no data is inserted. Instead the data is saved to a temporary CSV file (using Pandas' mature CSV support) then read back to Postgres using Psychopg2 support for COPY FROM STDIN.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# First let's have a look if we have any NAN values in our dataset | |
data.select([count(when(isnan(c), c)).alias(c) for c in data.columns]).head().asDict() | |
>> {'artist': 0, | |
'auth': 0, | |
'firstName': 0, | |
'gender': 0, | |
'itemInSession': 0, | |
'lastName': 0, | |
'length': 0, | |
'level': 0, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Read data into spark. | |
# Note: Ideally data should be in a schema supported format like parquet, | |
# which also supports partitioning, something very important while ingesting big data. | |
# Also data may be placed in a distributed filesystem like HDFS or in a cloud | |
# provider storage bucket like AWS S3 / Google Cloud Storage for faster reads. | |
# here we only read from local disk. | |
data = spark.read.json('mini_sparkify_event_data.json') | |
# How many user activity rows do we have? | |
data.count() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# create a Spark session, if not there is or get an existing one | |
spark = SparkSession \ | |
.builder \ | |
.appName("Sparkify The music streaming platform churn detection") \ | |
.getOrCreate() | |
# Check the current Spark Config | |
spark.sparkContext.getConf().getAll() | |
>> [('spark.app.id', 'local-1569248217329'), |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# import libraries | |
from pyspark import SparkConf, SparkContext | |
from pyspark.sql import SparkSession, Window | |
from pyspark.sql.functions import count, when, isnan, isnull, desc_nulls_first, desc, \ | |
from_unixtime, col, dayofweek, dayofyear, hour, to_date, month | |
import pyspark.sql.functions as F | |
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler, StandardScaler, MinMaxScaler | |
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier | |
# sc = SparkContext(appName="Project_workspace") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import nltk | |
nltk.download(['punkt', 'wordnet', 'averaged_perceptron_tagger']) | |
import re | |
import numpy as np | |
import pandas as pd | |
from nltk.tokenize import word_tokenize | |
from nltk.stem import WordNetLemmatizer | |
from sklearn.metrics import confusion_matrix |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import nltk | |
nltk.download(['punkt', 'wordnet', 'averaged_perceptron_tagger']) | |
import re | |
import pandas as pd | |
from nltk.tokenize import word_tokenize | |
from nltk.stem import WordNetLemmatizer | |
from sklearn.base import BaseEstimator, TransformerMixin | |
url_regex = 'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import nltk | |
nltk.download(['punkt', 'wordnet']) | |
import re | |
import numpy as np | |
import pandas as pd | |
from nltk.tokenize import word_tokenize | |
from nltk.stem import WordNetLemmatizer | |
from sklearn.metrics import confusion_matrix | |
from sklearn.model_selection import train_test_split |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
######### ansible.cfg FILE ############ | |
[defaults] | |
inventory = ./dev | |
######### DEV FILE ############ | |
# Dev file | |
[servers] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def diff_df(df1, df2, how="left"): | |
""" | |
Find Difference of rows for given two dataframes | |
this function is not symmetric, means | |
diff(x, y) != diff(y, x) | |
however | |
diff(x, y, how='left') == diff(y, x, how='right') | |
Ref: https://stackoverflow.com/questions/18180763/set-difference-for-pandas/40209800#40209800 | |
""" |