Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save alexarchambault/e309c8f735a4278f6912107ee72fe7a2 to your computer and use it in GitHub Desktop.
Save alexarchambault/e309c8f735a4278f6912107ee72fe7a2 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"https://github.com/spotify/scio/blob/master/scio-examples/src/main/scala/com/spotify/scio/examples/WordCount.scala"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"\u001b[32mimport \u001b[39m\u001b[36m$ivy.$ \n",
"\u001b[39m\n",
"\u001b[32mimport \u001b[39m\u001b[36mjupyter.scio._\n",
"\n",
"\u001b[39m\n",
"\u001b[32mimport \u001b[39m\u001b[36mcom.spotify.scio._\n",
"\u001b[39m\n",
"\u001b[32mimport \u001b[39m\u001b[36mcom.spotify.scio.accumulators._\n",
"\u001b[39m\n",
"\u001b[32mimport \u001b[39m\u001b[36mcom.spotify.scio.bigquery._\n",
"\u001b[39m\n",
"\u001b[32mimport \u001b[39m\u001b[36mcom.spotify.scio.experimental._\n",
"\n",
"\u001b[39m\n",
"\u001b[36msc\u001b[39m: \u001b[32mcom\u001b[39m.\u001b[32mspotify\u001b[39m.\u001b[32mscio\u001b[39m.\u001b[32mjupyter\u001b[39m.\u001b[32mJupyterScioContext\u001b[39m = com.spotify.scio.jupyter.JupyterScioContext@59b6baa2"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import $ivy.`org.jupyter-scala::scio:0.4.0-RC1`\n",
"import jupyter.scio._\n",
"\n",
"import com.spotify.scio._\n",
"import com.spotify.scio.accumulators._\n",
"import com.spotify.scio.bigquery._\n",
"import com.spotify.scio.experimental._\n",
"\n",
"val sc = JupyterScioContext(\n",
" \"runner\" -> \"DataflowPipelineRunner\",\n",
" \"project\" -> \"jupyter-scala\",\n",
" \"stagingLocation\" -> \"gs://test-bucket/jupyter-scala-scio-test\"\n",
").withGcpCredential(sys.props(\"user.home\") + \"/.gcp/credentials.json\")"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"\u001b[36mmax\u001b[39m: \u001b[32mvalues\u001b[39m.\u001b[32mAccumulator\u001b[39m[\u001b[32mInt\u001b[39m] = com.spotify.scio.ScioContext$$anonfun$maxAccumulator$1$$anon$2@3c750432\n",
"\u001b[36mmin\u001b[39m: \u001b[32mvalues\u001b[39m.\u001b[32mAccumulator\u001b[39m[\u001b[32mInt\u001b[39m] = com.spotify.scio.ScioContext$$anonfun$minAccumulator$1$$anon$3@1ac6335f\n",
"\u001b[36msumNonEmpty\u001b[39m: \u001b[32mvalues\u001b[39m.\u001b[32mAccumulator\u001b[39m[\u001b[32mLong\u001b[39m] = com.spotify.scio.ScioContext$$anonfun$sumAccumulator$1$$anon$4@755f86e2\n",
"\u001b[36msumEmpty\u001b[39m: \u001b[32mvalues\u001b[39m.\u001b[32mAccumulator\u001b[39m[\u001b[32mLong\u001b[39m] = com.spotify.scio.ScioContext$$anonfun$sumAccumulator$1$$anon$4@3d7084e7\n",
"\u001b[36mres1_4\u001b[39m: \u001b[32mconcurrent\u001b[39m.\u001b[32mFuture\u001b[39m[\u001b[32mio\u001b[39m.\u001b[32mTap\u001b[39m[\u001b[32mString\u001b[39m]] = List()"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"val max = sc.maxAccumulator[Int](\"maxLineLength\")\n",
"val min = sc.minAccumulator[Int](\"minLineLength\")\n",
"val sumNonEmpty = sc.sumAccumulator[Long](\"nonEmptyLines\")\n",
"val sumEmpty = sc.sumAccumulator[Long](\"emptyLines\")\n",
"\n",
"sc.textFile(\"gs://dataflow-samples/shakespeare/kinglear.txt\")\n",
" .map(_.trim)\n",
" .accumulateBy(max, min)(_.length)\n",
" .accumulateCountFilter(sumNonEmpty, sumEmpty)(_.nonEmpty)\n",
" .flatMap(_.split(\"[^a-zA-Z']+\").filter(_.nonEmpty))\n",
" .countByValue\n",
" .map(t => t._1 + \": \" + t._2)\n",
" .saveAsTextFile(\"gs://tezt-bucket/jupyter-scala-scio-test/results/output\")"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Dataflow SDK version: 1.8.0\n",
"Submitted job: 2016-12-03_07_19_27-2970784770332319618\n"
]
},
{
"data": {
"text/plain": [
"\u001b[36mresult\u001b[39m: \u001b[32mScioResult\u001b[39m = com.spotify.scio.ScioResult@e9cd954"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"val result = sc.close()"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"\u001b[36mres3_0\u001b[39m: \u001b[32mcom\u001b[39m.\u001b[32mgoogle\u001b[39m.\u001b[32mcloud\u001b[39m.\u001b[32mdataflow\u001b[39m.\u001b[32msdk\u001b[39m.\u001b[32mPipelineResult\u001b[39m.\u001b[32mState\u001b[39m = DONE\n",
"\u001b[36mres3_1\u001b[39m: \u001b[32mInt\u001b[39m = \u001b[32m69\u001b[39m\n",
"\u001b[36mres3_2\u001b[39m: \u001b[32mInt\u001b[39m = \u001b[32m0\u001b[39m\n",
"\u001b[36mres3_3\u001b[39m: \u001b[32mLong\u001b[39m = \u001b[32m3862L\u001b[39m\n",
"\u001b[36mres3_4\u001b[39m: \u001b[32mLong\u001b[39m = \u001b[32m1663L\u001b[39m"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"result.state\n",
"\n",
"result.accumulatorTotalValue(max)\n",
"result.accumulatorTotalValue(min)\n",
"result.accumulatorTotalValue(sumNonEmpty)\n",
"result.accumulatorTotalValue(sumEmpty)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Scala",
"language": "scala",
"name": "scala"
},
"language_info": {
"codemirror_mode": "text/x-scala",
"file_extension": ".scala",
"mimetype": "text/x-scala",
"name": "scala211",
"nbconvert_exporter": "scala",
"pygments_lexer": "scala",
"version": "2.11.8"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment