Skip to content

Instantly share code, notes, and snippets.

View snehamehrin's full-sized avatar

Sneha Mehrin snehamehrin

  • League
  • Toronto
View GitHub Profile
@snehamehrin
snehamehrin / Kinesis.py
Last active August 3, 2020 13:49
Kinesis Stack Ingestion
from stackapi import StackAPI
import subprocess
import setup
import boto3
import json
class Kinesis(object):
def __init__(self, StreamName=None):
self.StreamName = StreamName
# Import pyspark functions
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import unix_timestamp, to_date, date_format, month, year, dayofyear, dayofweek, col
from pyspark.sql.types import TimestampType
#Create a Spark Session
spark = SparkSession \
.builder \
.appName('Stack Overflow ML') \
import boto3
import setup
def create_emr():
client = boto3.client('emr', region_name='us-east-1')
cluster_id = client.run_job_flow(Name='stackoverflow', ReleaseLabel='emr-5.18.0',
Applications=[
{
'Name': 'Spark'
def mount_s3_bucket(access_key, secret_key, bucket_name, mount_folder):
ACCESS_KEY_ID = access_key
SECRET_ACCESS_KEY = secret_key
ENCODED_SECRET_KEY = SECRET_ACCESS_KEY.replace("/", "%2F")
print ("Mounting", bucket_name)
try:
# Unmount the data in case it was already mounted.
dbutils.fs.unmount("/mnt/%s" % mount_folder)
from pyspark.sql.functions import unix_timestamp, to_date, date_format, month, year, dayofyear, dayofweek, col
from pyspark.sql.types import TimestampType
#Intialize Spark Session
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName('Stack Overflow ML') \
.getOrCreate()
print('Session created')
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
df_duplicates.write.format("com.databricks.spark.redshift")\
.option("url", "jdbc:redshift://redshift-cluster-1.c9lgtyzxfycf.us-east-1.redshift.amazonaws.com:5439/dev?user=awsuser&password=")\
.option("dbtable", "stackoverflow")\
.option("forward_spark_s3_credentials","true")\
.option("tempdir", "s3n://stack-overflow-bucket")\
.mode("append")\
.save()
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "")
#Import All Functions
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import unix_timestamp, to_date, date_format, month, year, dayofyear, dayofweek, col
from pyspark.sql.types import TimestampType
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import unix_timestamp, to_date, date_format, month, year, dayofyear, dayofweek, col
from pyspark.sql.types import TimestampType
spark-submit --jars RedshiftJDBC42-no-awssdk-1.2.20.1043.jar
--packages org.apache.spark:spark-avro_2.11:2.4.3,com.databricks:spark-redshift_2.11:2.0.1 stack-processing.py