Created
October 21, 2016 13:59
-
-
Save jamesthomson/f75732dcaebb796c3921a939b91b3b4e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%pyspark | |
#read in datafile | |
data = sc.textFile('s3://bucket/clustering/zep_tracks.csv') | |
#read as rdd | |
import csv | |
rdd = data.mapPartitions(lambda x: csv.reader(x)) | |
#convert to dataframe | |
dataframe = rdd.toDF(['artist','artist_id','album','album_id','track','track_id','track_number','track_length', | |
'preview_url','danceability','energy','key','loudness','mode','speechiness','acousticness','instrumentalness', | |
'liveness','valence','tempo','duration_ms','time_signature']) | |
print dataframe.show(n=2) | |
#%pyspark | |
print dataframe.dtypes | |
%pyspark | |
#reformat columns as required | |
dataframe=dataframe.withColumn('danceability', dataframe.danceability.cast("Float")) | |
dataframe=dataframe.withColumn('energy', dataframe.energy.cast("Float")) | |
dataframe=dataframe.withColumn('key', dataframe.key.cast("Float")) | |
dataframe=dataframe.withColumn('loudness', dataframe.loudness.cast("Float")) | |
dataframe=dataframe.withColumn('mode', dataframe.mode.cast("Float")) | |
dataframe=dataframe.withColumn('speechiness', dataframe.speechiness.cast("Float")) | |
dataframe=dataframe.withColumn('acousticness', dataframe.acousticness.cast("Float")) | |
dataframe=dataframe.withColumn('instrumentalness', dataframe.instrumentalness.cast("Float")) | |
dataframe=dataframe.withColumn('liveness', dataframe.liveness.cast("Float")) | |
dataframe=dataframe.withColumn('valence', dataframe.valence.cast("Float")) | |
dataframe=dataframe.withColumn('tempo', dataframe.tempo.cast("Float")) | |
dataframe=dataframe.withColumn('duration_ms', dataframe.duration_ms.cast("Float")) | |
dataframe=dataframe.withColumn('time_signature', dataframe.time_signature.cast("Float")) | |
print dataframe.dtypes | |
#looking at the data | |
%pyspark | |
dataframe.filter(dataframe.energy>1).show() | |
%pyspark | |
import pyspark.sql.functions as func | |
dataframe.groupBy(dataframe.album).avg("loudness").select("album", func.col("avg(loudness)").alias("avg_loudness")).orderBy("avg_loudness", ascending=False).show(truncate=False) | |
%pyspark | |
dataframe.createOrReplaceTempView("sqltable") | |
sqlDF = spark.sql("select album, avg(energy) as avg_energy from sqltable group by album order by avg(energy) desc") | |
sqlDF.show() | |
%sql | |
select album, avg(danceability) as avg_danceability | |
from sqltable | |
group by album | |
order by avg(danceability) desc | |
%pyspark | |
#assemble features for clustering | |
from pyspark.ml.feature import VectorAssembler | |
assembler = VectorAssembler(inputCols=['danceability','energy','key','loudness','mode','speechiness','acousticness','instrumentalness','liveness','valence','tempo','duration_ms','time_signature'], outputCol="features") | |
cluster_vars = assembler.transform(dataframe) | |
%pyspark | |
#scale features for clustering | |
from pyspark.ml.feature import StandardScaler | |
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", | |
withStd=True, withMean=True) | |
scalerModel = scaler.fit(cluster_vars) | |
cluster_vars_scaled = scalerModel.transform(cluster_vars) | |
#run the clustering | |
%pyspark | |
from pyspark.ml.clustering import KMeans | |
kmeans = KMeans(featuresCol="scaledFeatures").setK(9).setSeed(1) | |
kmeans_model = kmeans.fit(cluster_vars_scaled) | |
print kmeans_model.computeCost(cluster_vars_scaled) | |
from pyspark.ml.clustering import BisectingKMeans | |
bkm = BisectingKMeans(featuresCol="features").setK(9).setSeed(1) | |
bkm_model = bkm.fit(cluster_vars_scaled) | |
print bkm_model.computeCost(cluster_vars_scaled) | |
from pyspark.ml.clustering import GaussianMixture | |
gmm = GaussianMixture(featuresCol="scaledFeatures").setK(9) | |
gmm_model = gmm.fit(cluster_vars_scaled) | |
#print gmm_model.computeCost(cluster_vars) | |
%pyspark | |
kmeans_pred = kmeans_model.transform(cluster_vars_scaled).select("scaledFeatures", "prediction").toDF("scaledFeatures", "kmeans_pred") | |
bkm_pred = bkm_model.transform(cluster_vars_scaled).select("features", "prediction").toDF("features", "bkm_pred") | |
gmm_pred = gmm_model.transform(cluster_vars_scaled).select("scaledFeatures", "prediction").toDF("scaledFeatures", "gmm_pred") | |
output=cluster_vars_scaled.join(kmeans_pred, on="scaledFeatures") | |
output=output.join(bkm_pred, on="features") | |
output=output.join(gmm_pred, on="scaledFeatures") | |
print output.dtypes | |
output.select("track", "kmeans_pred", "bkm_pred", "gmm_pred").show(n=5) | |
%pyspark | |
output.write.csv('s3://bucket/clustering/cluster_output.csv') | |
%pyspark | |
output.select("track", 'album', 'danceability','energy','key','loudness','mode','speechiness','acousticness','instrumentalness','liveness','valence','tempo','duration_ms','time_signature', 'kmeans_pred', 'bkm_pred', 'gmm_pred').createOrReplaceTempView("sqloutput") | |
%sql | |
select * from sqloutput limit 10 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment