Skip to content

Instantly share code, notes, and snippets.

@rbray89
Created August 16, 2018 06:43
Show Gist options
  • Save rbray89/b63c1191024c2bc179ce2cdb9cca7d54 to your computer and use it in GitHub Desktop.
Save rbray89/b63c1191024c2bc179ce2cdb9cca7d54 to your computer and use it in GitHub Desktop.
leaflet on the fly mapnik tile generator
#!/usr/bin/env python2
from mapnik import *
import math
import shutil
from os.path import dirname, basename, splitext, join, exists
import os
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
import SocketServer
mapfile = 'mapnik.xml'
class GoogleProjection:
def __init__(self, levels=18):
self.Bc = []
self.Cc = []
self.zc = []
self.Ac = []
c = 256
for d in range(0, levels):
e = c/2
self.Bc.append(c/360.0)
self.Cc.append(c/(2 * math.pi))
self.zc.append((e, e))
self.Ac.append(c)
c *= 2
def fromLLtoPixel(self, ll, zoom):
d = self.zc[zoom]
e = round(d[0] + ll[0] * self.Bc[zoom])
f = minmax(math.sin(math.radians(ll[1])), -0.9999, 0.9999)
g = round(d[1] + 0.5*math.log((1+f)/(1-f))*-self.Cc[zoom])
return (e, g)
def fromPixelToLL(self, px, zoom):
e = self.zc[zoom]
f = (px[0] - e[0])/self.Bc[zoom]
g = (px[1] - e[1])/-self.Cc[zoom]
h = math.degrees(2 * math.atan(math.exp(g)) - 0.5 * math.pi)
return (f, h)
class S(BaseHTTPRequestHandler):
def _set_headers(self):
self.send_response(200)
self.send_header('Content-type', 'image/png')
self.end_headers()
def do_GET(self):
self._set_headers()
render_size = 256
maxZoom = 22
m = Map(render_size, render_size)
merc = Projection('+proj=merc +a=6378137 +b=6378137 +lat_ts=0.0 +lon_0=0.0 +x_0=0.0 +y_0=0 +k=1.0 +units=m +nadgrids=@null +no_defs +over')
tileproj = GoogleProjection(maxZoom+1)
def tileno2bbox(x, y, z):
# Calculate pixel positions of bottom-left & top-right
p0 = (x * 256, (y + 1) * 256)
p1 = ((x + 1) * 256, y * 256)
# Convert to LatLong (EPSG:4326)
l0 = tileproj.fromPixelToLL(p0, z)
l1 = tileproj.fromPixelToLL(p1, z)
# Convert to map projection (e.g. mercator co-ords EPSG:900913)
c0 = merc.forward(Coord(l0[0], l0[1]))
c1 = merc.forward(Coord(l1[0], l1[1]))
# Bounding box for the tile
return Box2d(c0.x, c0.y, c1.x, c1.y)
def tileno2latlong(x, y, z):
# Calculate pixel positions of bottom-left & top-right
p0 = (x * 256, (y + 1) * 256)
p1 = ((x + 1) * 256, y * 256)
# Convert to LatLong (EPSG:4326)
l0 = tileproj.fromPixelToLL(p0, z)
l1 = tileproj.fromPixelToLL(p1, z)
# Convert to map projection (e.g. mercator co-ords EPSG:900913)
c0 = Coord(l0[0], l0[1])
c1 = Coord(l1[0], l1[1])
# Bounding box for the tile
return Box2d(c0.x, c0.y, c1.x, c1.y)
local = './www' + self.path
if os.path.exists(local):
with open(local, 'rb') as f:
self.wfile.write(f.read())
return
dirpath = dirname(local)
if not os.path.exists(dirpath):
os.makedirs(dirpath)
parts = os.path.normpath(self.path)
print self.path
print parts
print parts.split(os.sep)
d, z, x, y = parts.split(os.sep)
y = os.path.splitext(y)[0]
x = int(x)
y = int(y)
z = int(z)
datasources = {
'roads-labels': '',
'roads': 'way[highway];',
'leisure': 'way[leisure];',
'landuse': 'way[landuse];',
'places': 'node[place="city"];node[place="town"];node[place="hamlet"];node[place="suburb"];node[place="neighbourhood"];',
'natural': 'way[natural];',
'water': 'node[water];way[water];relation[water];<<;>>;',
'waterway': 'way[waterway];',
'building': 'way[building];'}
datasourcedup = {
'roads-labels': 'roads'
}
expandedsources = ['roads', 'roads-labels']
emptydb = 'www/empty.spatialite'
emptyosm = 'www/empty.osm'
if z >= 12:
topz = z
topx = x
topy = y
db = 'www/{}/{}/{}.spatialite'
osm = 'www/{}/{}/{}-{}.osm'
mapstring = ''
with open(mapfile, 'r') as f:
mapstring = f.read()
while (topz > 12):
topz -= 1
topx /= 2
topy /= 2
for source in datasources:
if datasources[source] is None:
sdb = emptydb
sosm = emptyosm
elif source in datasourcedup:
sourcedup = datasourcedup[source]
sdb = db.format(topz, topx, topy)
sosm = osm.format(topz, topx, topy, sourcedup)
else:
sdb = db.format(topz, topx, topy)
sosm = osm.format(topz, topx, topy, source)
if not exists(sosm) or not exists(sdb):
print 'fetching {}...'.format(source)
boxlatlong = tileno2latlong(topx, topy, topz)
if source in expandedsources:
expw = boxlatlong.width()/50.0
exph = boxlatlong.height()/50.0
else:
expw = 0.0
exph = 0.0
boxlatlong = Box2d(boxlatlong.minx-expw, boxlatlong.miny-exph, boxlatlong.maxx+expw, boxlatlong.maxy+exph)
import overpass
api = overpass.API('http://overpass-api.de/api/interpreter', timeout=None)
r = api._get_from_overpass('''
[bbox:{0},{1},{2},{3}];
(
{4}
);
out meta;
>;
out skel qt;
'''.format(boxlatlong.miny, boxlatlong.minx, boxlatlong.maxy, boxlatlong.maxx, datasources[source]))
response = r.text
dirpath = dirname(sosm)
if not os.path.exists(dirpath):
os.makedirs(dirpath)
with open(sosm, 'w') as w:
w.write(response)
dirpath = dirname(sdb)
if not os.path.exists(dirpath):
os.makedirs(dirpath)
# CFLAGS="-I /usr/include/gdal" CXXFLAGS="-I /usr/include/gdal" pip install GDAL==2.2.3
# pip install ogrtools
# pip install overpass
import ogrtools.pyogr.ogr2ogr as ogr2ogr
import gdal
gdal.SetConfigOption("OSM_USE_CUSTOM_INDEXING", "NO")
ogr2ogr.ogr2ogr(pszFormat='SQLite',
papszDSCO=['SPATIALITE=YES'],
pszDestDataSource=sdb,
pszDataSource=sosm,
bAppend=True,
bUpdate=True)
with open(sosm, 'w') as w:
w.write('')
mapstring = mapstring.replace('{}.spatialite'.format(source), sdb)
else:
if not exists(emptydb) and not exists(emptyosm):
response = '''<?xml version="1.0" encoding="UTF-8"?>
<osm version="0.6" generator="NULL" attribution="http://naturalearthdata.com" license="None">
</osm>
'''
with open(emptyosm, 'w') as w:
w.write(response)
if not exists(emptydb):
# CFLAGS="-I /usr/include/gdal" CXXFLAGS="-I /usr/include/gdal" pip install GDAL==2.2.3
# pip install ogrtools
# pip install overpass
import ogrtools.pyogr.ogr2ogr as ogr2ogr
ogr2ogr.ogr2ogr(pszFormat='SQLite',
papszDSCO=['SPATIALITE=YES'],
pszDestDataSource=emptydb,
pszDataSource=emptyosm)
mapstring = ''
with open(mapfile, 'r') as f:
mapstring = f.read()
for source in datasources:
mapstring = mapstring.replace('{}.spatialite'.format(source), emptydb)
load_map_from_string(m, mapstring)
print m.envelope()
bbox = m.envelope()
long = -105.2501
lat = 40.0237
import sys
zoom = float(sys.argv[1])
bounds = tileno2bbox(x, y, z)
m.resize(render_size, render_size)
m.zoom_to_box(bounds)
m.buffer_size = render_size / 2
render_tile_to_file(m, 0, 0, render_size, render_size, local, 'png256')
with open(local, 'rb') as png:
self.wfile.write(png.read())
def do_HEAD(self):
self._set_headers()
def run(server_class=HTTPServer, handler_class=S, port=80):
server_address = ('', port)
httpd = server_class(server_address, handler_class)
print 'Starting httpd...'
httpd.serve_forever()
if __name__ == "__main__":
from sys import argv
if len(argv) == 2:
run(port=int(argv[1]))
else:
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment