Skip to content

Instantly share code, notes, and snippets.

@maasg
Created November 25, 2016 11:47
Show Gist options
  • Select an option

  • Save maasg/53a024be638dc951a72c3579a5f5b7df to your computer and use it in GitHub Desktop.

Select an option

Save maasg/53a024be638dc951a72c3579a5f5b7df to your computer and use it in GitHub Desktop.
Group a dataset of coordinates in cells of a given resolution, starting at the minimal coordinates from the set using a scalable Spark-based approach
Display the source blob
Display the rendered blob
Raw
{
"metadata": {
"name": "GeoGrid",
"user_save_timestamp": "1970-01-01T01:00:00.000Z",
"auto_save_timestamp": "1970-01-01T01:00:00.000Z",
"language_info": {
"name": "scala",
"file_extension": "scala",
"codemirror_mode": "text/x-scala"
},
"trusted": true,
"customLocalRepo": null,
"customRepos": null,
"customDeps": null,
"customImports": null,
"customArgs": null,
"customSparkConf": null
},
"cells": [
{
"metadata": {
"id": "4EBFB64175434B52A2A4FD2B9C4FF685"
},
"cell_type": "markdown",
"source": "Created with the Spark Notebook: http://spark-notebook.io/\nInteractive data science with Spark"
},
{
"metadata": {
"id": "5E59107A3CBA4F798CF76C77E0096C8D"
},
"cell_type": "markdown",
"source": "#GeoCells \nGroup a dataset of coordinates in cells of a given resolution, starting at the minimal coordinates from the set.\n\nThe solution should scale to 10s or millions, so we will use Spark and a distributed algorithmic approach"
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "FD192997D35A45988E5BCA873CAABD44"
},
"cell_type": "code",
"source": "val resolution = 0.01",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "resolution: Double = 0.01\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 1,
"time": "Took: 766 milliseconds, at 2016-11-25 11:20"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "430A2E8E590B45D98675CCB4660A7C0A"
},
"cell_type": "code",
"source": "val sampleData = \"/home/maasg/playground/data/sampleGeoCoordinatesWithTs.csv\"",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "sampleData: String = /home/maasg/playground/data/sampleGeoCoordinatesWithTs.csv\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 23,
"time": "Took: 530 milliseconds, at 2016-11-25 11:47"
}
]
},
{
"metadata": {
"id": "83DDE7BED0BB46B08D3326A2E12C9A9C"
},
"cell_type": "markdown",
"source": "We read the sample data using the CSV support from Spark 2.0"
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "834B1ED9A5C34DFDA352A20863557CBC"
},
"cell_type": "code",
"source": "val data = sparkSession.read.option(\"inferSchema\", \"true\").csv(sampleData).toDF(\"date\",\"lat\",\"long\")",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "data: org.apache.spark.sql.DataFrame = [date: string, lat: double ... 1 more field]\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 26,
"time": "Took: 451 milliseconds, at 2016-11-25 11:48"
}
]
},
{
"metadata": {
"id": "E04B19E885AE4893B41FEA3FE331F3BE"
},
"cell_type": "markdown",
"source": "## Let's view the data first"
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "7A54E532B22044F8A35896C6AD12F1B6"
},
"cell_type": "code",
"source": "val latLong = data.select($\"lat\",$\"long\")\nval w = GeoPointsChart(latLong)\nw",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "latLong: org.apache.spark.sql.DataFrame = [lat: double, long: double]\nw: notebook.front.widgets.charts.GeoPointsChart[org.apache.spark.sql.DataFrame] = <GeoPointsChart widget>\nres32: notebook.front.widgets.charts.GeoPointsChart[org.apache.spark.sql.DataFrame] = <GeoPointsChart widget>\n"
},
{
"metadata": {},
"data": {
"text/html": "<div>\n <script data-this=\"{&quot;dataId&quot;:&quot;anon55ff31eb821157a149d30b1e1ac0b25f&quot;,&quot;dataInit&quot;:[{&quot;lat&quot;:-73.1111112212,&quot;long&quot;:45.2},{&quot;lat&quot;:-73.1555555121,&quot;long&quot;:45.20005011},{&quot;lat&quot;:-73.1112232113,&quot;long&quot;:45.20000051},{&quot;lat&quot;:-73.1121243113,&quot;long&quot;:45.20100011},{&quot;lat&quot;:-73.1234123412,&quot;long&quot;:45.20004011},{&quot;lat&quot;:-73.1521233123,&quot;long&quot;:45.20000211},{&quot;lat&quot;:-73.1531231233,&quot;long&quot;:45.20000011},{&quot;lat&quot;:-73.1114423304,&quot;long&quot;:45.21100003},{&quot;lat&quot;:-73.1443144233,&quot;long&quot;:45.22130002},{&quot;lat&quot;:-73.1283500011,&quot;long&quot;:45.21900001}],&quot;genId&quot;:&quot;285105360&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/playground','../javascripts/notebook/magic/geoPointsChart'], \n function(playground, _magicgeoPointsChart) {\n // data ==> data-this (in observable.js's scopedEval) ==> this in JS => { dataId, dataInit, ... }\n // this ==> scope (in observable.js's scopedEval) ==> this.parentElement ==> div.container below (toHtml)\n\n playground.call(data,\n this\n ,\n {\n \"f\": _magicgeoPointsChart,\n \"o\": {\"lat\":\"lat\",\"lon\":\"long\",\"width\":600,\"height\":400,\"rField\":null,\"colorField\":null}\n }\n \n \n \n );\n }\n );/*]]>*/</script>\n <div>\n <span class=\"chart-total-item-count\"><p data-bind=\"text: value\"><script data-this=\"{&quot;valueId&quot;:&quot;anona7dbfd4251938d41f0b28b264658b0ac&quot;,&quot;initialValue&quot;:&quot;10&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p> entries total</span>\n <span class=\"chart-sampling-warning\"><p data-bind=\"text: value\"><script data-this=\"{&quot;valueId&quot;:&quot;anon92d0111e9669a2d7cd0c2adaa1c10924&quot;,&quot;initialValue&quot;:&quot;&quot;}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId, initialValue)\n },\n this\n );\n});\n /*]]>*/</script></p></span>\n <div>\n </div>\n </div></div>"
},
"output_type": "execute_result",
"execution_count": 28,
"time": "Took: 545 milliseconds, at 2016-11-25 11:48"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "0C76EA19024F4138811A5A7AF4E863B5"
},
"cell_type": "code",
"source": "import org.apache.spark.sql.Row\nval Row(minLat:Double, minLong:Double) = data.select(min($\"lat\"),min($\"long\")).head",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "import org.apache.spark.sql.Row\nminLat: Double = -73.1555555121\nminLong: Double = 45.2\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 19,
"time": "Took: 603 milliseconds, at 2016-11-25 11:38"
}
]
},
{
"metadata": {
"id": "033642BA545E49E3B5172EF2E9FBF389"
},
"cell_type": "markdown",
"source": "###We transform the dataset relative to the min(lat),min(long) point of the bounding box"
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "D6E4EA3D95024BCA95194447B911C18D"
},
"cell_type": "code",
"source": "def cellUdf(minValue:Double, res:Double) = udf((x:Double) => ((x-minValue)/res).toInt)\nval latCoordsUdf = cellUdf(minLat, resolution)\nval longCoordsUdf = cellUdf(minLong, resolution)",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "cellUdf: (minValue: Double, res: Double)org.apache.spark.sql.expressions.UserDefinedFunction\nlatCoordsUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(DoubleType)))\nlongCoordsUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(DoubleType)))\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 36,
"time": "Took: 617 milliseconds, at 2016-11-25 11:58"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "A15008ABD21E4213A610174D31569FBC"
},
"cell_type": "code",
"source": "val relData = data.withColumn(\"cellx\",latCoordsUdf($\"lat\")).withColumn(\"celly\", longCoordsUdf($\"long\"))",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "relData: org.apache.spark.sql.DataFrame = [date: string, lat: double ... 3 more fields]\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 39,
"time": "Took: 276 milliseconds, at 2016-11-25 11:59"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "8E3AC20565CB4533909D67706CA6EC26"
},
"cell_type": "code",
"source": "relData.show(10)",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "+----------+--------------+-----------+-----+-----+\n| date| lat| long|cellx|celly|\n+----------+--------------+-----------+-----+-----+\n|10/23/2015|-73.1111112212| 45.2| 4| 0|\n|10/23/2015|-73.1555555121|45.20005011| 0| 0|\n|10/23/2015|-73.1112232113|45.20000051| 4| 0|\n|10/20/2015|-73.1121243113|45.20100011| 4| 0|\n|10/20/2015|-73.1234123412|45.20004011| 3| 0|\n|10/23/2015|-73.1521233123|45.20000211| 0| 0|\n|10/23/2015|-73.1531231233|45.20000011| 0| 0|\n|10/23/2015|-73.1114423304|45.21100003| 4| 1|\n|10/23/2015|-73.1443144233|45.22130002| 1| 2|\n|10/23/2015|-73.1283500011|45.21900001| 2| 1|\n+----------+--------------+-----------+-----+-----+\n\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 40,
"time": "Took: 400 milliseconds, at 2016-11-25 11:59"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": true,
"id": "79F443A4D2904A45ABE00479581D4F36"
},
"cell_type": "code",
"source": "",
"outputs": []
}
],
"nbformat": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment