Created
July 19, 2020 14:03
-
-
Save snehamehrin/e5b8853f96b0548bc6f917af92fe4bb9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Import pyspark functions | |
from pyspark.sql import functions as F | |
from pyspark.sql import SparkSession | |
from pyspark.sql.functions import unix_timestamp, to_date, date_format, month, year, dayofyear, dayofweek, col | |
from pyspark.sql.types import TimestampType | |
#Create a Spark Session | |
spark = SparkSession \ | |
.builder \ | |
.appName('Stack Overflow ML') \ | |
.getOrCreate() | |
print('Session created') | |
sc = spark.sparkContext | |
#Load the data from stackoverflow bucket | |
stack = sc.textFile('s3://stack-overflow-bucket/stack_firehose_stream', 1) | |
stack.take(5) | |
#Convert it into a dataframe | |
df = spark.read.json(stack) | |
df.show(5) | |
#Drop Duplicates | |
df_duplicates=df.dropDuplicates(['question_id']) | |
#Feature Engineering for dates | |
df_duplicates=df_duplicates.withColumn("created_date", F.from_unixtime("creation_date", "dd/MM/yyyy HH:00:00")) | |
df_duplicates=df_duplicates.withColumn("last_activity_date_final", F.from_unixtime("last_activity_date", "dd/MM/yyyy HH:00:00")) | |
# Write the file back to S3 | |
df_duplicates.write.format("csv").option("header","true").mode("Overwrite").save("s3://stack-overflow-bucket/test") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment