Created
April 13, 2016 20:47
-
-
Save mchalek/d1ff822e96712c80f327f48b4912e121 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def computeLocalDensities( | |
args: LocalPopDensityArgs, | |
country: LocalPopDensityCountryConfig | |
)( | |
implicit sc: SparkContext | |
): RDD[ReverseGeoLabel] = { | |
val radiusMeters = country.radius | |
val countryTags = args.getShapefileTagRDD(country.name) | |
// group by finest available Lx level, to reduce size of secondary testing group | |
val tags: RDD[(LxTag, ReverseGeoLabel)] = countryTags.keyBy(extractTagGroup(_)) | |
tags.persist(MEMORY_AND_DISK_SER) | |
// Reduce grouped coordinates into bounding boxes and pull back to master | |
val tagGroups: Array[TagGroup] = | |
tags.aggregateByKey(GeoBoundingBox.empty)( | |
(gbb, rgl) => gbb.merge(computeBoundingBox(rgl.coordinate.latLon, radiusMeters)), | |
(gbb0, gbb1) => gbb0.merge(gbb1) | |
).map { case (group, boundingBox) => TagGroup(group, boundingBox) }.collect | |
val bcBoundingBoxes = sc.broadcast(tagGroups) | |
val worldPop = loadWorldPop(country.path) | |
val groupHits: RDD[(LxTag, (LatLon, Double))] = | |
worldPop.mapPartitions { iter => | |
val bisector = new BoundingBoxBisector(bcBoundingBoxes.value) | |
iter.flatMap { case pair @ (latLon, _) => | |
val hits = bisector(latLon) | |
hits.map { tagGroup => | |
tagGroup.group -> pair | |
} | |
} | |
} | |
val maxPointsPerMultiPolygon = args.maxPointsPerMultiPolygon | |
val joinedAndSplit: RDD[(ReverseGeoLabel, Iterable[(LatLon, Double)])] = | |
tags.cogroup(groupHits).flatMap { case (_, (observations, densityCoordinates)) => | |
// each obsCoordinate defines a circle against which we must check all densityCoordinates | |
// for inclusion. We want to make sure we don't overload any one task with too may | |
// coordinates to check, so we split up densityCoordinates if necessary. | |
val numSplits = (densityCoordinates.size + maxPointsPerMultiPolygon - 1) / maxPointsPerMultiPolygon | |
val splitSize = (densityCoordinates.size + numSplits - 1) / numSplits | |
val splitDensity = densityCoordinates.grouped(splitSize).toSeq | |
for { | |
obs <- observations | |
densityGroup <- splitDensity | |
} yield { | |
(obs, densityGroup) | |
} | |
}.repartition(1024) | |
val partiallyIntegrated: RDD[(ReverseGeoLabel, PixelAggregator)] = | |
joinedAndSplit.map { case (obs, densityGroup) => | |
val (deltaLat, deltaLon) = getAxes(obs.coordinate.latLon, radiusMeters) | |
val gsf = new GeometricShapeFactory | |
gsf.setWidth(2*deltaLon) | |
gsf.setHeight(2*deltaLat) | |
gsf.setNumPoints(100) | |
val domain = gsf.createEllipse | |
(obs, PixelAggregator.aggregate(densityGroup, domain)) | |
} | |
val fullyIntegrated = | |
partiallyIntegrated.reduceByKey(_+_).map { case (rgl, density) => | |
rgl.copy( | |
shapefileTag = None, | |
popDensityTag = Some(density.mean) | |
) | |
} | |
tags.unpersist() | |
fullyIntegrated | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment