Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save ixaxaar/4ce2790a3f1c8484ce013021af2a8e98 to your computer and use it in GitHub Desktop.
Save ixaxaar/4ce2790a3f1c8484ce013021af2a8e98 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Template / example of using Spark from a notebook"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Setup classpath"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"\u001b[36msparkHome\u001b[0m: \u001b[32mString\u001b[0m = \u001b[32m\"/spark/1.6.0-bin-scala-2.11_hadoop-2.6\"\u001b[0m\n",
"\u001b[36mhadoopConfDir\u001b[0m: \u001b[32mString\u001b[0m = \u001b[32m\"/etc/hadoop\"\u001b[0m\n",
"\u001b[36msparkAssembly\u001b[0m: \u001b[32mString\u001b[0m = \u001b[32m\"/spark/1.6.0-bin-scala-2.11_hadoop-2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar\"\u001b[0m\n",
"\u001b[36msparkMaster\u001b[0m: \u001b[32mString\u001b[0m = \u001b[32m\"yarn-client\"\u001b[0m\n",
"\u001b[36msparkVersion\u001b[0m: \u001b[32mString\u001b[0m = \u001b[32m\"1.6.0\"\u001b[0m\n",
"\u001b[36mhadoopVersion\u001b[0m: \u001b[32mString\u001b[0m = \u001b[32m\"2.6.0\"\u001b[0m\n",
"\u001b[36mscalaVersion\u001b[0m: \u001b[32mString\u001b[0m = \u001b[32m\"2.11.7\"\u001b[0m"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"val sparkHome = \"/spark/1.6.0-bin-scala-2.11_hadoop-2.6\"\n",
"val hadoopConfDir = \"/etc/hadoop\" // should contain things like yarn-site.xml or hdfs-site.xml\n",
"val sparkAssembly = s\"$sparkHome/lib/spark-assembly-1.6.0-hadoop2.6.0.jar\"\n",
"val sparkMaster = \"yarn-client\"\n",
"val sparkVersion = \"1.6.0\"\n",
"val hadoopVersion = \"2.6.0\"\n",
"val scalaVersion = scala.util.Properties.versionNumberString"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Adding 157 artifact(s)\n"
]
},
{
"data": {
"text/plain": []
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"classpath.addPath(hadoopConfDir)\n",
"\n",
"classpath.add(\n",
" \"com.github.alexarchambault.ammonium\" % s\"spark_${sparkVersion}_${scalaVersion}\" % \"0.4.0-M3\",\n",
" \"org.apache.spark\" %% \"spark-yarn\" % sparkVersion,\n",
" \"org.apache.hadoop\" % \"hadoop-client\" % hadoopVersion,\n",
" \"org.apache.hadoop\" % \"hadoop-yarn-server-web-proxy\" % hadoopVersion,\n",
" \"org.apache.hadoop\" % \"hadoop-aws\" % hadoopVersion\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create handle, import `sparkConf` and `sc` (SparkContext) from it"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"\u001b[36mSpark\u001b[0m: \u001b[32mammonite\u001b[0m.\u001b[32mspark\u001b[0m.\u001b[32mSpark\u001b[0m = Spark(uninitialized)"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"@transient val Spark = new ammonite.spark.Spark"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"\u001b[32mimport \u001b[36mSpark.{ sparkConf, sc, sqlContext }\u001b[0m"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"import Spark.{ sparkConf, sc, sqlContext }"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Setup config"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"\u001b[36mres4\u001b[0m: \u001b[32morg\u001b[0m.\u001b[32mapache\u001b[0m.\u001b[32mspark\u001b[0m.\u001b[32mSparkConf\u001b[0m = org.apache.spark.SparkConf@2be23e76"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"sparkConf\n",
" .setMaster(sparkMaster)\n",
" .setAppName(\"notebook-test\")\n",
" .set(\"spark.executor.memory\", \"8g\")\n",
" .set(\"spark.shuffle.service.enabled\", \"true\")\n",
" .set(\"spark.dynamicAllocation.enabled\", \"true\")\n",
" .set(\"spark.dynamicAllocation.minExecutors\", \"0\")\n",
" .set(\"spark.dynamicAllocation.maxExecutors\", \"4\")\n",
" .set(\"spark.home\", sparkHome)\n",
" .set(\"spark.yarn.jar\", sparkAssembly)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Force creation of the SparkContext\n",
"\n",
"It is lazily initialized upon first call."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [
{
"data": {
"text/plain": [
"\u001b[36mres5\u001b[0m: \u001b[32morg\u001b[0m.\u001b[32mapache\u001b[0m.\u001b[32mspark\u001b[0m.\u001b[32mSparkContext\u001b[0m = SparkContext"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"sc"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Basic test"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"\u001b[36maccum\u001b[0m: \u001b[32morg\u001b[0m.\u001b[32mapache\u001b[0m.\u001b[32mspark\u001b[0m.\u001b[32mAccumulator\u001b[0m[\u001b[32mInt\u001b[0m] = 0"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"val accum = sc.accumulator(0)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [
{
"data": {
"text/plain": []
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"sc.parallelize(1 to 10).foreach(x => accum += x)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"\u001b[36mres8\u001b[0m: \u001b[32mInt\u001b[0m = \u001b[32m55\u001b[0m"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"accum.value"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Stop the context\n",
"Frees its resources from the cluster."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": []
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"sc.stop()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Scala 2.11",
"language": "scala211",
"name": "scala211"
},
"language_info": {
"codemirror_mode": "text/x-scala",
"file_extension": ".scala",
"mimetype": "text/x-scala",
"name": "scala211",
"pygments_lexer": "scala",
"version": "2.11.7"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment