Last active
April 19, 2016 16:39
-
-
Save mallman/d0c888333c4ce5e322da7e5e7f9cb13b to your computer and use it in GitHub Desktop.
Simple content similarity computation in Spark with Jupyter Scala kernel
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"\u001b[36mres0_0\u001b[0m: \u001b[32mjupyter\u001b[0m.\u001b[32mscala\u001b[0m.\u001b[32mBuildInfo\u001b[0m.type = version: 0.3.0-SNAPSHOT, ammoniumVersion: 0.4.0-M5\n", | |
"\u001b[36mres0_1\u001b[0m: \u001b[32mString\u001b[0m = \u001b[32m\"2.11.8\"\u001b[0m" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"jupyter.scala.BuildInfo\n", | |
"scala.util.Properties.versionNumberString" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": { | |
"collapsed": false, | |
"scrolled": true | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"142 new artifact(s)\n" | |
] | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"143 new artifacts in macro\n", | |
"143 new artifacts in runtime\n", | |
"143 new artifacts in compile\n" | |
] | |
}, | |
{ | |
"data": { | |
"text/plain": [ | |
"\u001b[36mammoniumVersion\u001b[0m: \u001b[32mString\u001b[0m = \u001b[32m\"0.4.0-M5\"\u001b[0m\n", | |
"\u001b[36mscalaVersion\u001b[0m: \u001b[32mString\u001b[0m = \u001b[32m\"2.11.8\"\u001b[0m\n", | |
"\u001b[36msparkHome\u001b[0m: \u001b[32mString\u001b[0m = \u001b[32m\"/opt/spark\"\u001b[0m\n", | |
"\u001b[36mres1_10\u001b[0m: \u001b[32mMap\u001b[0m[\u001b[32mString\u001b[0m, \u001b[32mSet\u001b[0m[(\u001b[32mString\u001b[0m, \u001b[32mString\u001b[0m, \u001b[32mString\u001b[0m)]] = \u001b[33mMap\u001b[0m(\n", | |
" \u001b[32m\"macro\"\u001b[0m -> \u001b[33mSet\u001b[0m(\n", | |
" \u001b[33m\u001b[0m(\u001b[32m\"org.scala-lang\"\u001b[0m, \u001b[32m\"scala-compiler\"\u001b[0m, \u001b[32m\"2.11.8\"\u001b[0m),\n", | |
" \u001b[33m\u001b[0m(\u001b[32m\"com.github.alexarchambault.jupyter\"\u001b[0m, \u001b[32m\"scala-api_2.11.8\"\u001b[0m, \u001b[32m\"0.3.0-SNAPSHOT\"\u001b[0m),\n", | |
" \u001b[33m\u001b[0m(\u001b[32m\"com.github.alexarchambault.ammonium\"\u001b[0m, \u001b[32m\"spark_1.5.2_2.11.8\"\u001b[0m, \u001b[32m\"0.4.0-M5\"\u001b[0m)\n", | |
" ),\n", | |
" \u001b[32m\"runtime\"\u001b[0m -> \u001b[33mSet\u001b[0m(\n", | |
" \u001b[33m\u001b[0m(\u001b[32m\"com.github.alexarchambault.jupyter\"\u001b[0m, \u001b[32m\"scala-api_2.11.8\"\u001b[0m, \u001b[32m\"0.3.0-SNAPSHOT\"\u001b[0m),\n", | |
" \u001b[33m\u001b[0m(\u001b[32m\"com.github.alexarchambault.ammonium\"\u001b[0m, \u001b[32m\"spark_1.5.2_2.11.8\"\u001b[0m, \u001b[32m\"0.4.0-M5\"\u001b[0m)\n", | |
" ),\n", | |
" \u001b[32m\"compile\"\u001b[0m -> \u001b[33mSet\u001b[0m(\n", | |
" \u001b[33m\u001b[0m(\u001b[32m\"com.github.alexarchambault.jupyter\"\u001b[0m, \u001b[32m\"scala-api_2.11.8\"\u001b[0m, \u001b[32m\"0.3.0-SNAPSHOT\"\u001b[0m),\n", | |
" \u001b[33m\u001b[0m(\u001b[32m\"com.github.alexarchambault.ammonium\"\u001b[0m, \u001b[32m\"spark_1.5.2_2.11.8\"\u001b[0m, \u001b[32m\"0.4.0-M5\"\u001b[0m)\n", | |
" )\n", | |
")" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"val ammoniumVersion = jupyter.scala.BuildInfo.ammoniumVersion\n", | |
"val scalaVersion = scala.util.Properties.versionNumberString\n", | |
"\n", | |
"val sparkHome = \"/opt/spark\"\n", | |
"\n", | |
"classpath.addPath(s\"$sparkHome/conf\")\n", | |
"classpath.addPath(s\"$sparkHome/lib/spark-assembly-1.5.3-SNAPSHOT-hadoop2.6.0-cdh5.5.1.jar\")\n", | |
"classpath.addPath(s\"$sparkHome/lib/datanucleus-api-jdo-3.2.6.jar\")\n", | |
"classpath.addPath(s\"$sparkHome/lib/datanucleus-rdbms-3.2.9.jar\")\n", | |
"classpath.addPath(s\"$sparkHome/lib/datanucleus-core-3.2.10.jar\")\n", | |
"classpath.addPath(\"/etc/hadoop/conf\")\n", | |
"\n", | |
"classpath.add(\"com.github.alexarchambault.ammonium\" % s\"spark_1.5.2_$scalaVersion\" % ammoniumVersion)\n", | |
"\n", | |
"classpath.dependencies" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"defined \u001b[32mfunction \u001b[36mgetPropertiesFromFile\u001b[0m" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"def getPropertiesFromFile(filename: String): Map[String, String] = {\n", | |
" import java.io._\n", | |
" import java.util._\n", | |
" import scala.collection.JavaConversions._\n", | |
"\n", | |
" val properties = new Properties\n", | |
" val inReader = new InputStreamReader(new FileInputStream(filename), \"UTF-8\")\n", | |
"\n", | |
" try {\n", | |
" properties.load(inReader)\n", | |
" } finally {\n", | |
" inReader.close\n", | |
" }\n", | |
"\n", | |
" properties.stringPropertyNames.map(k => (k, properties(k).trim)).toMap\n", | |
"}\n", | |
"\n", | |
"getPropertiesFromFile(s\"$sparkHome/conf/spark-defaults.conf\")\n", | |
" .filter { case (k, v) => k.startsWith(\"spark.\") }\n", | |
" .foreach { case (k, v) =>\n", | |
" sys.props.getOrElseUpdate(k, v)\n", | |
" }" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"SLF4J: Class path contains multiple SLF4J bindings.\n", | |
"SLF4J: Found binding in [jar:file:/opt/spark/lib/spark-assembly-1.5.3-SNAPSHOT-hadoop2.6.0-cdh5.5.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]\n", | |
"SLF4J: Found binding in [jar:file:/home/msa/.coursier/cache/v1/https/repo1.maven.org/maven2/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]\n", | |
"SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.\n", | |
"SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]\n" | |
] | |
}, | |
{ | |
"data": { | |
"text/plain": [ | |
"\u001b[36mSpark\u001b[0m: \u001b[32mammonite\u001b[0m.\u001b[32mspark\u001b[0m.\u001b[32mSpark\u001b[0m = Spark\n", | |
"\u001b[36mconf\u001b[0m: \u001b[32morg\u001b[0m.\u001b[32mapache\u001b[0m.\u001b[32mspark\u001b[0m.\u001b[32mSparkConf\u001b[0m = org.apache.spark.SparkConf@58d80e3f\n", | |
"\u001b[36msc\u001b[0m: \u001b[32morg\u001b[0m.\u001b[32mapache\u001b[0m.\u001b[32mspark\u001b[0m.\u001b[32mSparkContext\u001b[0m = SparkContext\n", | |
"\u001b[36msqlContext\u001b[0m: \u001b[32morg\u001b[0m.\u001b[32mapache\u001b[0m.\u001b[32mspark\u001b[0m.\u001b[32msql\u001b[0m.\u001b[32mSQLContext\u001b[0m = org.apache.spark.sql.SQLContext@389638b7\n", | |
"\u001b[36msql\u001b[0m: \u001b[32mString\u001b[0m => \u001b[32morg\u001b[0m.\u001b[32mapache\u001b[0m.\u001b[32mspark\u001b[0m.\u001b[32msql\u001b[0m.\u001b[32mDataFrame\u001b[0m = <function1>" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"org.apache.log4j.Logger.getRootLogger().setLevel(org.apache.log4j.Level.toLevel(\"ERROR\"))\n", | |
"\n", | |
"@transient val Spark = new ammonite.spark.Spark\n", | |
"\n", | |
"@transient val conf =\n", | |
" Spark\n", | |
" .sparkConf\n", | |
" .setAppName(\"Content Similarity\")\n", | |
" .set(\"spark.home\", sparkHome)\n", | |
"\n", | |
"@transient val sc = Spark.sc\n", | |
"@transient val sqlContext = Spark.sqlContext\n", | |
"@transient val sql = sqlContext.sql _" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"\u001b[32mimport \u001b[36msqlContext.implicits._\u001b[0m\n", | |
"\u001b[36mratingsDf\u001b[0m: \u001b[32morg\u001b[0m.\u001b[32mapache\u001b[0m.\u001b[32mspark\u001b[0m.\u001b[32msql\u001b[0m.\u001b[32mDataFrame\u001b[0m = [user_id: int, content_id: int, rating: double]" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"import sqlContext.implicits._\n", | |
"val ratingsDf = Seq((0, 0, 0.0), (1, 0, 3.0), (2, 1, 4.0), (0, 1, 4.0), (1, 1, 3.0)).toDF(\"user_id\", \"content_id\", \"rating\")\n", | |
"ratingsDf.registerTempTable(\"ratings\")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"metadata": { | |
"collapsed": false, | |
"scrolled": true | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"\u001b[32mimport \u001b[36morg.apache.spark.mllib.linalg._, distributed._\u001b[0m\n", | |
"\u001b[36mcontentItemCount\u001b[0m: \u001b[32mInt\u001b[0m = \u001b[32m2\u001b[0m\n", | |
"\u001b[36mratingsRdd\u001b[0m: \u001b[32morg\u001b[0m.\u001b[32mapache\u001b[0m.\u001b[32mspark\u001b[0m.\u001b[32mrdd\u001b[0m.\u001b[32mRDD\u001b[0m[\u001b[32morg\u001b[0m.\u001b[32mapache\u001b[0m.\u001b[32mspark\u001b[0m.\u001b[32mmllib\u001b[0m.\u001b[32mlinalg\u001b[0m.\u001b[32mVector\u001b[0m] = MapPartitionsRDD[7] at map at Main.scala:42\n", | |
"\u001b[36mratingsMatrix\u001b[0m: \u001b[32morg\u001b[0m.\u001b[32mapache\u001b[0m.\u001b[32mspark\u001b[0m.\u001b[32mmllib\u001b[0m.\u001b[32mlinalg\u001b[0m.\u001b[32mdistributed\u001b[0m.\u001b[32mRowMatrix\u001b[0m = org.apache.spark.mllib.linalg.distributed.RowMatrix@685d1532" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"import org.apache.spark.mllib.linalg._, distributed._\n", | |
"\n", | |
"val contentItemCount = 2\n", | |
"val ratingsRdd =\n", | |
" sqlContext.table(\"ratings\")\n", | |
" .map(row => (row.getInt(0), row.getInt(1), row.getDouble(2)))\n", | |
" .groupBy { case (userId, _, _) => userId }\n", | |
" .map { case (_, ratingTriples) => ratingTriples }\n", | |
" .map(_.map { case (_, contentId, rating) => (contentId, rating) }.toSeq)\n", | |
" .map(contentRatingSeq => Vectors.sparse(contentItemCount, contentRatingSeq))\n", | |
"val ratingsMatrix = new RowMatrix(ratingsRdd)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"metadata": { | |
"collapsed": false, | |
"scrolled": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [] | |
}, | |
{ | |
"data": { | |
"text/plain": [ | |
"\u001b[36mcontentSimilarityMatrix\u001b[0m: \u001b[32mCoordinateMatrix\u001b[0m = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@4629be1d" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"val contentSimilarityMatrix = ratingsMatrix.columnSimilarities" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 8, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"\u001b[36msimilarityDf\u001b[0m: \u001b[32morg\u001b[0m.\u001b[32mapache\u001b[0m.\u001b[32mspark\u001b[0m.\u001b[32msql\u001b[0m.\u001b[32mDataFrame\u001b[0m = [content_id1: bigint, content_id2: bigint, cosine_sim: double]" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"val similarityDf =\n", | |
" contentSimilarityMatrix\n", | |
" .entries\n", | |
" .map(entry => (entry.i, entry.j, entry.value))\n", | |
" .toDF(\"content_id1\", \"content_id2\", \"cosine_sim\")\n", | |
"similarityDf.registerTempTable(\"similarity\")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 9, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+-----------+-----------+------------------+\n", | |
"|content_id1|content_id2| cosine_sim|\n", | |
"+-----------+-----------+------------------+\n", | |
"| 0| 1|0.4685212856658182|\n", | |
"+-----------+-----------+------------------+\n", | |
"\n" | |
] | |
}, | |
{ | |
"data": { | |
"text/plain": [] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"sql(\"select * from similarity order by content_id1, content_id2\").show" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 10, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"sc.stop" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Scala 2.11", | |
"language": "scala211", | |
"name": "scala211" | |
}, | |
"language_info": { | |
"codemirror_mode": "text/x-scala", | |
"file_extension": ".scala", | |
"mimetype": "text/x-scala", | |
"name": "scala211", | |
"pygments_lexer": "scala", | |
"version": "2.11.8" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 0 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment