Skip to content

Instantly share code, notes, and snippets.

@AashishTiwari
Created February 9, 2017 05:02
Show Gist options
  • Save AashishTiwari/f27e6e4681723b55ae8992d3d9de18e1 to your computer and use it in GitHub Desktop.
Save AashishTiwari/f27e6e4681723b55ae8992d3d9de18e1 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Visualizing Clusters of Clickbait Headlines Using Spark and Word2vec"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"spark_path = \"/home/ubuntu/spark-2.0.0-bin-hadoop2.7\"\n",
"\n",
"import findspark\n",
"findspark.init(spark_path)\n",
"\n",
"import pyspark\n",
"from pyspark.sql import SparkSession\n",
"sc = pyspark.SparkContext(appName='fb_headlines')\n",
"spark = SparkSession(sc)"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Web URL: 172.31.3.185:4040\n"
]
}
],
"source": [
"config = sc._conf.getAll()\n",
"print 'Web URL: ' + filter(lambda x: 'spark.driver.host' in x[0], config)[0][1] + ':4040'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Add a function to preprocess each DF."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from pyspark.sql.functions import regexp_replace\n",
"from pyspark.sql import Column\n",
"from pyspark.sql.types import *\n",
"\n",
"\n",
"\n",
"def process_df(df):\n",
" return (df\n",
" .withColumn('text',regexp_replace('link_name', '\\'', ''))\n",
" .filter(\"text != ''\")\n",
" .filter(\"text != 'Timeline Photos'\")\n",
" )\n",
"\n",
"def read_tsv(path):\n",
" \n",
" # predefine data frame schema: faster\n",
" \n",
" schema = StructType([\n",
" StructField(\"page_id\", StringType(), True),\n",
" StructField(\"status_id\", StringType(), True),\n",
" StructField(\"link_name\", StringType(), True),\n",
" StructField(\"status_published\", TimestampType(), True),\n",
" StructField(\"num_reactions\", IntegerType(), True)])\n",
" \n",
" return spark.read.csv(path ,schema = schema, header=True, inferSchema=True, sep=\"\\t\")"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+--------------------+-------------+\n",
"| link_name| status_published|num_reactions|\n",
"+--------------------+--------------------+-------------+\n",
"|Joseph Schooling ...|2016-08-12 21:14:...| 3508|\n",
"|Bill Clinton: Ema...|2016-08-12 21:01:...| 1592|\n",
"|Hacker releases c...|2016-08-12 21:01:...| 446|\n",
"|Lionel Messi anno...|2016-08-12 20:39:...| 1324|\n",
"|Fighting the male...|2016-08-12 20:31:...| 129|\n",
"|The face of the O...|2016-08-12 20:11:...| 1597|\n",
"|Trump: If Clinton...|2016-08-12 19:51:...| 4017|\n",
"|Texas baby found ...|2016-08-12 19:28:...| 4860|\n",
"|Malawi is moving ...|2016-08-12 19:01:...| 310|\n",
"|Thomas Gibson fir...|2016-08-12 18:44:...| 2026|\n",
"|Watch Donald Trum...|2016-08-12 18:01:...| 2600|\n",
"|Watch a timelapse...|2016-08-12 17:22:...| 1299|\n",
"|Poll: Clinton lea...|2016-08-12 16:46:...| 9445|\n",
"|Brendan Dassey's ...|2016-08-12 16:19:...| 9240|\n",
"|Live from Yankee ...|2016-08-12 16:16:...| 1568|\n",
"|Live: The story b...|2016-08-12 16:02:...| 2269|\n",
"|US women's soccer...|2016-08-12 15:46:...| 620|\n",
"|'There are no win...|2016-08-12 15:26:...| 178|\n",
"|Senate Republican...|2016-08-12 14:23:...| 411|\n",
"|Live: Flooding in...|2016-08-12 14:13:...| 3154|\n",
"+--------------------+--------------------+-------------+\n",
"only showing top 20 rows\n",
"\n",
"102267\n",
"root\n",
" |-- page_id: string (nullable = true)\n",
" |-- status_id: string (nullable = true)\n",
" |-- link_name: string (nullable = true)\n",
" |-- status_published: timestamp (nullable = true)\n",
" |-- num_reactions: integer (nullable = true)\n",
" |-- text: string (nullable = true)\n",
"\n"
]
}
],
"source": [
"df_cnn = read_tsv(\"fb_headlines/CNN_fb.tsv\")\n",
"df_nytimes = read_tsv(\"fb_headlines/NYTimes_fb.tsv\")\n",
"df_buzzfeed = read_tsv(\"fb_headlines/BuzzFeed_fb.tsv\")\n",
"df_upworthy = read_tsv(\"fb_headlines/Upworthy_fb.tsv\")\n",
"\n",
"df = df_cnn.union(df_nytimes).union(df_buzzfeed).union(df_upworthy)\n",
"df = process_df(df).cache()\n",
"\n",
"df.select('link_name', 'status_published', 'num_reactions').show()\n",
"print df.count()\n",
"df.printSchema()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+--------------------+\n",
"| text| words|\n",
"+--------------------+--------------------+\n",
"|Joseph Schooling ...|[joseph, schoolin...|\n",
"|Bill Clinton: Ema...|[bill, clinton, e...|\n",
"|Hacker releases c...|[hacker, releases...|\n",
"|Lionel Messi anno...|[lionel, messi, a...|\n",
"|Fighting the male...|[fighting, the, m...|\n",
"|The face of the O...|[the, face, of, t...|\n",
"|Trump: If Clinton...|[trump, if, clint...|\n",
"|Texas baby found ...|[texas, baby, fou...|\n",
"|Malawi is moving ...|[malawi, is, movi...|\n",
"|Thomas Gibson fir...|[thomas, gibson, ...|\n",
"|Watch Donald Trum...|[watch, donald, t...|\n",
"|Watch a timelapse...|[watch, a, timela...|\n",
"|Poll: Clinton lea...|[poll, clinton, l...|\n",
"|Brendan Dasseys c...|[brendan, dasseys...|\n",
"|Live from Yankee ...|[live, from, yank...|\n",
"|Live: The story b...|[live, the, story...|\n",
"|US womens soccer ...|[us, womens, socc...|\n",
"|There are no winn...|[there, are, no, ...|\n",
"|Senate Republican...|[senate, republic...|\n",
"|Live: Flooding in...|[live, flooding, ...|\n",
"+--------------------+--------------------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"from pyspark.ml.feature import Tokenizer, RegexTokenizer\n",
"\n",
"tokenizer = RegexTokenizer(pattern=\"[^\\w]\",inputCol=\"text\", outputCol=\"words\")\n",
"df = tokenizer.transform(df)\n",
"df.select('text','words').show()"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"u\"inputCol: input column name. (current: words)\\nmaxIter: max number of iterations (>= 0). (default: 1)\\nmaxSentenceLength: Maximum length (in words) of each sentence in the input data. Any sentence longer than this threshold will be divided into chunks up to the size. (default: 1000)\\nminCount: the minimum number of times a token must appear to be included in the word2vec model's vocabulary (default: 5)\\nnumPartitions: number of partitions for sentences of words (default: 1)\\noutputCol: output column name. (default: Word2Vec_4489a7007b655c817b2c__output, current: vectors)\\nseed: random seed. (default: 724255263782197636, current: 42)\\nstepSize: Step size to be used for each iteration of optimization (>= 0). (default: 0.025)\\nvectorSize: the dimension of codes after transforming from words (default: 100, current: 50)\\nwindowSize: the window size (context words from [-window, window]). Default value is 5 (default: 5)\""
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from pyspark.ml.feature import Word2Vec\n",
"\n",
"word2Vec = Word2Vec(vectorSize=50, seed=42, inputCol=\"words\", outputCol=\"vectors\")\n",
"word2Vec.explainParams()"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+--------------------+\n",
"| words| vectors|\n",
"+--------------------+--------------------+\n",
"|[joseph, schoolin...|[-0.0295360057022...|\n",
"|[bill, clinton, e...|[0.00839712098240...|\n",
"|[hacker, releases...|[-0.0453028933377...|\n",
"|[lionel, messi, a...|[-0.0399187423288...|\n",
"|[fighting, the, m...|[-0.0419914640951...|\n",
"|[the, face, of, t...|[-0.0460251685231...|\n",
"|[trump, if, clint...|[0.00891906013047...|\n",
"|[texas, baby, fou...|[-0.0943986974656...|\n",
"|[malawi, is, movi...|[0.04506426247826...|\n",
"|[thomas, gibson, ...|[-0.1057210512226...|\n",
"|[watch, donald, t...|[-0.0874061348537...|\n",
"|[watch, a, timela...|[0.06606105796527...|\n",
"|[poll, clinton, l...|[0.06775298264498...|\n",
"|[brendan, dasseys...|[0.04835192763776...|\n",
"|[live, from, yank...|[-0.0411922155568...|\n",
"|[live, the, story...|[-0.0622380969247...|\n",
"|[us, womens, socc...|[-0.0382462517757...|\n",
"|[there, are, no, ...|[0.07911438743273...|\n",
"|[senate, republic...|[-0.0546692525036...|\n",
"|[live, flooding, ...|[0.02606966607272...|\n",
"+--------------------+--------------------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"\n",
"model = word2Vec.fit(df)\n",
"df = model.transform(df)\n",
"\n",
"df.select('words', 'vectors').show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Not required for analysis, but fun to check!"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false,
"scrolled": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----------+------------------+\n",
"| word| similarity|\n",
"+-----------+------------------+\n",
"| deer|0.7518758500661575|\n",
"| apocalypse|0.7027730099556968|\n",
"| happiest|0.6720302546187348|\n",
"| vmas|0.6665068107149325|\n",
"| upshot|0.6659036225560672|\n",
"| grid|0.6645280176415026|\n",
"| reclaiming|0.6620794275524378|\n",
"| opening|0.6545831443299779|\n",
"|opinionator|0.6539698447594439|\n",
"| oscars|0.6491699200786258|\n",
"| countdown|0.6488588551173281|\n",
"| wild|0.6446697158905897|\n",
"| extreme| 0.633502635230153|\n",
"| largest| 0.63119806576113|\n",
"| carpet|0.6290782842323763|\n",
"| ceremony|0.6275797991993366|\n",
"| simone|0.6215671608391896|\n",
"| longest|0.6200576886880117|\n",
"| brightest|0.6186380473151332|\n",
"| smallest|0.6142787876081013|\n",
"+-----------+------------------+\n",
"\n",
"+------------+------------------+\n",
"| word| similarity|\n",
"+------------+------------------+\n",
"|presidential|0.7822506427244476|\n",
"| democratic|0.7412928207831224|\n",
"| republican|0.7373115162062022|\n",
"| pundit|0.7338010365508537|\n",
"| midterm|0.7196630916857696|\n",
"| highlights|0.7053224986060211|\n",
"| debates|0.7052636851785653|\n",
"| primary| 0.700160171570455|\n",
"| candidates|0.6962582299145136|\n",
"| speeches|0.6797515274313827|\n",
"| blogging|0.6715483654863899|\n",
"| contenders|0.6672229041029176|\n",
"| nominees|0.6627767427490374|\n",
"| joins|0.6588803757136239|\n",
"| nominations|0.6556532007039568|\n",
"| analysis|0.6475339005855463|\n",
"| announces|0.6465768112787106|\n",
"| yales| 0.646489205854291|\n",
"| winners|0.6460696717549468|\n",
"| heartland|0.6407706266686899|\n",
"+------------+------------------+\n",
"\n"
]
}
],
"source": [
"model.findSynonyms(\"olympics\", 20).show()\n",
"model.findSynonyms(\"election\", 20).show()"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-------+-------------+\n",
"|page_id| page_ohe|\n",
"+-------+-------------+\n",
"| CNN|(3,[1],[1.0])|\n",
"| CNN|(3,[1],[1.0])|\n",
"| CNN|(3,[1],[1.0])|\n",
"| CNN|(3,[1],[1.0])|\n",
"| CNN|(3,[1],[1.0])|\n",
"| CNN|(3,[1],[1.0])|\n",
"| CNN|(3,[1],[1.0])|\n",
"| CNN|(3,[1],[1.0])|\n",
"| CNN|(3,[1],[1.0])|\n",
"| CNN|(3,[1],[1.0])|\n",
"| CNN|(3,[1],[1.0])|\n",
"| CNN|(3,[1],[1.0])|\n",
"| CNN|(3,[1],[1.0])|\n",
"| CNN|(3,[1],[1.0])|\n",
"| CNN|(3,[1],[1.0])|\n",
"| CNN|(3,[1],[1.0])|\n",
"| CNN|(3,[1],[1.0])|\n",
"| CNN|(3,[1],[1.0])|\n",
"| CNN|(3,[1],[1.0])|\n",
"| CNN|(3,[1],[1.0])|\n",
"+-------+-------------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"from pyspark.ml.feature import StringIndexer, OneHotEncoder\n",
"\n",
"stringIndexer = StringIndexer(inputCol=\"page_id\", outputCol=\"indexed\")\n",
"model = stringIndexer.fit(df)\n",
"df = model.transform(df)\n",
"encoder = OneHotEncoder(inputCol=\"indexed\", outputCol=\"page_ohe\")\n",
"df = encoder.transform(df)\n",
"\n",
"df.select('page_id', 'page_ohe').show()"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+\n",
"| merged_vectors|\n",
"+--------------------+\n",
"|[0.0,1.0,0.0,-0.0...|\n",
"|[0.0,1.0,0.0,0.00...|\n",
"|[0.0,1.0,0.0,-0.0...|\n",
"|[0.0,1.0,0.0,-0.0...|\n",
"|[0.0,1.0,0.0,-0.0...|\n",
"|[0.0,1.0,0.0,-0.0...|\n",
"|[0.0,1.0,0.0,0.00...|\n",
"|[0.0,1.0,0.0,-0.0...|\n",
"|[0.0,1.0,0.0,0.04...|\n",
"|[0.0,1.0,0.0,-0.1...|\n",
"|[0.0,1.0,0.0,-0.0...|\n",
"|[0.0,1.0,0.0,0.06...|\n",
"|[0.0,1.0,0.0,0.06...|\n",
"|[0.0,1.0,0.0,0.04...|\n",
"|[0.0,1.0,0.0,-0.0...|\n",
"|[0.0,1.0,0.0,-0.0...|\n",
"|[0.0,1.0,0.0,-0.0...|\n",
"|[0.0,1.0,0.0,0.07...|\n",
"|[0.0,1.0,0.0,-0.0...|\n",
"|[0.0,1.0,0.0,0.02...|\n",
"+--------------------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"from pyspark.ml.feature import VectorAssembler\n",
"\n",
"model = VectorAssembler(inputCols=['page_ohe', 'vectors'], outputCol=\"merged_vectors\")\n",
"df = model.transform(df)\n",
"\n",
"df.select('merged_vectors').show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Collect subset of post since June to make visualization more manageable."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false,
"scrolled": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"9500\n",
"root\n",
" |-- page_id: string (nullable = true)\n",
" |-- status_id: string (nullable = true)\n",
" |-- link_name: string (nullable = true)\n",
" |-- status_published: timestamp (nullable = true)\n",
" |-- num_reactions: integer (nullable = true)\n",
" |-- text: string (nullable = true)\n",
" |-- words: array (nullable = true)\n",
" | |-- element: string (containsNull = true)\n",
" |-- vectors: vector (nullable = true)\n",
" |-- indexed: double (nullable = true)\n",
" |-- page_ohe: vector (nullable = true)\n",
" |-- merged_vectors: vector (nullable = true)\n",
"\n"
]
}
],
"source": [
"df2 = df.filter(df.status_published > '2016-06-01 00:00:00').cache()\n",
"\n",
"print df2.count()\n",
"df2.printSchema()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To CSV, for passing into R."
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import pandas as pd\n",
"\n",
"(df2.select('page_id', 'status_id', 'link_name', 'status_published', 'num_reactions', 'merged_vectors')\n",
" .toPandas()\n",
" .to_csv('fb_headlines_53D.csv', index=False, encoding=\"utf-8\")\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
@AashishTiwari
Copy link
Author

@GauriShah: can you please take a look?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment