Skip to content

Instantly share code, notes, and snippets.

@mchalek
Created April 13, 2016 20:47
Show Gist options
  • Save mchalek/d1ff822e96712c80f327f48b4912e121 to your computer and use it in GitHub Desktop.
Save mchalek/d1ff822e96712c80f327f48b4912e121 to your computer and use it in GitHub Desktop.
def computeLocalDensities(
args: LocalPopDensityArgs,
country: LocalPopDensityCountryConfig
)(
implicit sc: SparkContext
): RDD[ReverseGeoLabel] = {
val radiusMeters = country.radius
val countryTags = args.getShapefileTagRDD(country.name)
// group by finest available Lx level, to reduce size of secondary testing group
val tags: RDD[(LxTag, ReverseGeoLabel)] = countryTags.keyBy(extractTagGroup(_))
tags.persist(MEMORY_AND_DISK_SER)
// Reduce grouped coordinates into bounding boxes and pull back to master
val tagGroups: Array[TagGroup] =
tags.aggregateByKey(GeoBoundingBox.empty)(
(gbb, rgl) => gbb.merge(computeBoundingBox(rgl.coordinate.latLon, radiusMeters)),
(gbb0, gbb1) => gbb0.merge(gbb1)
).map { case (group, boundingBox) => TagGroup(group, boundingBox) }.collect
val bcBoundingBoxes = sc.broadcast(tagGroups)
val worldPop = loadWorldPop(country.path)
val groupHits: RDD[(LxTag, (LatLon, Double))] =
worldPop.mapPartitions { iter =>
val bisector = new BoundingBoxBisector(bcBoundingBoxes.value)
iter.flatMap { case pair @ (latLon, _) =>
val hits = bisector(latLon)
hits.map { tagGroup =>
tagGroup.group -> pair
}
}
}
val maxPointsPerMultiPolygon = args.maxPointsPerMultiPolygon
val joinedAndSplit: RDD[(ReverseGeoLabel, Iterable[(LatLon, Double)])] =
tags.cogroup(groupHits).flatMap { case (_, (observations, densityCoordinates)) =>
// each obsCoordinate defines a circle against which we must check all densityCoordinates
// for inclusion. We want to make sure we don't overload any one task with too may
// coordinates to check, so we split up densityCoordinates if necessary.
val numSplits = (densityCoordinates.size + maxPointsPerMultiPolygon - 1) / maxPointsPerMultiPolygon
val splitSize = (densityCoordinates.size + numSplits - 1) / numSplits
val splitDensity = densityCoordinates.grouped(splitSize).toSeq
for {
obs <- observations
densityGroup <- splitDensity
} yield {
(obs, densityGroup)
}
}.repartition(1024)
val partiallyIntegrated: RDD[(ReverseGeoLabel, PixelAggregator)] =
joinedAndSplit.map { case (obs, densityGroup) =>
val (deltaLat, deltaLon) = getAxes(obs.coordinate.latLon, radiusMeters)
val gsf = new GeometricShapeFactory
gsf.setWidth(2*deltaLon)
gsf.setHeight(2*deltaLat)
gsf.setNumPoints(100)
val domain = gsf.createEllipse
(obs, PixelAggregator.aggregate(densityGroup, domain))
}
val fullyIntegrated =
partiallyIntegrated.reduceByKey(_+_).map { case (rgl, density) =>
rgl.copy(
shapefileTag = None,
popDensityTag = Some(density.mean)
)
}
tags.unpersist()
fullyIntegrated
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment