Created
July 3, 2024 07:52
-
-
Save lukegre/643787211590f44d3b39a17bfa87168d to your computer and use it in GitHub Desktop.
Download google maps tiles for a given bbox (WSEN). Data is returned as a geolocated xarray.DataArray
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import xarray as xr | |
import rioxarray as rxr | |
from loguru import logger | |
from pygeotile.tile import Tile | |
__package__ = __file__.split('/')[-1].split('.')[0] | |
logger.warning( | |
f"Note that you can only use `{__package__}` for non-commercial " | |
"visualizations and NOT for computations and bulk downloads. ") | |
class GoogleTile(Tile): | |
""" | |
Base class that represents a Google Maps tile based on the pygeotile.Tile class. | |
In this package you should use the parent class `GoogleScene` to download and | |
visualize regions. | |
""" | |
def __new__(cls, *args, map_layer='s', **kwargs): | |
self = super(GoogleTile, cls).__new__(cls, *args, **kwargs) | |
self.TILE_SIZE = 256 | |
self.shape = self.TILE_SIZE, self.TILE_SIZE | |
self.x = self.google[0] | |
self.y = self.google[1] | |
self.set_layer(map_layer) | |
self.map_layer = map_layer | |
self._xr = None | |
self._image = None | |
self._data = None | |
return self | |
def set_layer(self, layer='s'): | |
self._url = self._get_url_for_layer(layer) | |
self.url = self._url.format(x=self.x, y=self.y, z=self.zoom) | |
def _get_url_for_layer(self, layer='s'): | |
__doc__ = """ | |
Returns a URL for a Google Maps tile. | |
Parameters | |
---------- | |
layer : str | |
The layer to use. Valid values are: | |
s: Google Satellite | |
y: Google Satellite Hybrid | |
p: Google Terrain Hybrid | |
t: Google Terrain | |
Invalid that exist, but do not work for this package: | |
m: Google Maps | |
h: Google Roads | |
""" | |
l = layer.lower() | |
valid = '\n' + '\n'.join(__doc__.split('\n')[7:10]) | |
assert l in 'syp', f"Invalid layer: {layer}, valid layers are {valid}" | |
# map tile number to subdomain | |
d = 0 if l in 'msy' else 1 | |
url = f"https://mt{d}.google.com/vt/lyrs={l}&x={{x}}&y={{y}}&z={{z}}" | |
return url | |
@property | |
def bbox(self): | |
lower_left = self.bounds[0] | |
upper_right = self.bounds[1] | |
bbox_WSEN = [ | |
lower_left.longitude, lower_left.latitude, | |
upper_right.longitude, upper_right.latitude] | |
return bbox_WSEN | |
@property | |
def image(self): | |
if self._image is None: | |
self._image = self.get_image() | |
return self._image | |
@property | |
def data(self): | |
if self._data is None: | |
self._data = self.get_data() | |
return self._data | |
@property | |
def xr(self): | |
if self._xr is None: | |
self._xr = self.get_dataarray() | |
return self._xr | |
@property | |
def lat(self): | |
y0 = self.y * self.TILE_SIZE | |
y1 = y0 + self.TILE_SIZE | |
x = self.x * self.TILE_SIZE + self.TILE_SIZE // 2 | |
pixel_range = np.arange(y0, y1, dtype=float) | |
lats = self._from_pixel_loc_to_latlng(x, pixel_range, self.zoom)[0] | |
return lats | |
@property | |
def lon(self): | |
x0 = self.x * self.TILE_SIZE | |
x1 = x0 + self.TILE_SIZE | |
y = self.y * self.TILE_SIZE + self.TILE_SIZE // 2 | |
pixel_range = np.arange(x0, x1, dtype=float) | |
lons = self._from_pixel_loc_to_latlng(pixel_range, y, self.zoom)[1] | |
return lons | |
@staticmethod | |
def _download_url_to_bytes(url): | |
import requests | |
from io import BytesIO | |
response = requests.get(url) | |
io = BytesIO(response.content) | |
return io | |
@staticmethod | |
def get_resolution_from_bbox_and_shape(bbox, shape): | |
from rasterio.transform import from_bounds | |
x, y = shape[:2] | |
transform = from_bounds(*bbox, x, y) | |
res = abs(transform[0]) | |
return res | |
def get_image(self): | |
from PIL import Image | |
file_obj = self._download_url_to_bytes(self.url) | |
image = Image.open(file_obj) | |
return image | |
def get_data(self): | |
image = self.get_image() | |
data = np.array(image) | |
return data | |
def _from_pixel_loc_to_latlng(self, x, y, zoom): | |
import math | |
pixels_per_lon_degree = self.TILE_SIZE / 360.0 | |
pixels_per_lon_radian = self.TILE_SIZE / (2 * math.pi) | |
x0 = self.TILE_SIZE / 2.0 | |
y0 = self.TILE_SIZE / 2.0 | |
num_tiles = 1 << zoom | |
x /= num_tiles | |
y /= num_tiles | |
lon = (x - x0) / pixels_per_lon_degree | |
lat_radians = (y - y0) / -pixels_per_lon_radian | |
lat = np.rad2deg(2 * np.arctan(np.exp(lat_radians)) - np.pi / 2) | |
return lat, lon | |
def get_dataarray(self): | |
data = self.get_data() | |
da = xr.DataArray( | |
data=data, | |
dims=["y", "x", "band"], | |
coords={ | |
"y": self.lat, | |
"x": self.lon, | |
"band": "r g b".split(), | |
}, | |
).transpose("band", "y", "x") | |
da = ( | |
da | |
.rio.write_crs("EPSG:4326")) | |
return da | |
class GoogleScene: | |
def __init__(self, bbox, load=False, max_pixels=2048, map_layer='s'): | |
""" | |
Get geolocated data from Google Maps in an xr.DataArray. | |
Parameters | |
---------- | |
bbox : list | |
A list of 4 floats representing the bounding box in WSEN order. | |
load : bool | |
If True, the data will be downloaded and loaded into memory on initialization. | |
Otherwise, the data will be downloaded when the `xr` attribute is accessed. | |
max_pixels : int | |
a rough limit on the image size. note that images are often larger or smaller | |
than this limit since we use the mean of the bbox to calculate the zoom level. | |
map_layer : str | |
The map layer to use. Valid values are: | |
s: Google Satellite (default) | |
y: Google Satellite Hybrid | |
p: Google Terrain Hybrid | |
t: Google Terrain (not recommended) | |
Examples | |
-------- | |
>>> bbox = [8.29, 47.26, 8.75, 47.50] | |
>>> scene = GoogleScene(bbox, load=True) | |
>>> scene # plots map in Jupyter notebook or shows info in console | |
""" | |
self.bbox = bbox | |
self.max_pixels = max_pixels | |
self.map_layer = map_layer | |
self._data = None | |
self._xr = None | |
self._zoom_res = self._get_zoom_level_resolution() | |
self.zoom = self._get_zoom_level() | |
self._tiles = None | |
if load: | |
_ = self.xr | |
@property | |
def tiles(self): | |
if self._tiles is None: | |
self._tiles = self.get_tiles() | |
return self._tiles | |
@property | |
def xr(self): | |
if self._xr is None: | |
self._xr = self._get_dataarray() | |
return self._xr | |
@property | |
def data(self): | |
if self._data is None: | |
self._data = self.xr.data | |
return self._data | |
def plot(self, ax=None, dpi=100, **kwargs): | |
import matplotlib.pyplot as plt | |
da = self.xr | |
if ax is None: | |
ry, rx = da.y.size, da.x.size | |
pixel_ratio = ry / rx | |
width = 10 | |
fig, ax = plt.subplots(figsize=(width, width * pixel_ratio), dpi=dpi) | |
else: | |
fig = ax.get_figure() | |
props = dict() | |
props.update(kwargs) | |
da.plot.imshow(ax=ax, **props) | |
ax.xaxis.set_visible(False) | |
ax.yaxis.set_visible(False) | |
ax.set_title("") | |
ax.set_position([0, 0, 1, 1]) | |
return fig, ax | |
def _get_zoom_level_resolution(self): | |
zoom_levels = range(20) | |
res_m = {} | |
for z in zoom_levels: | |
tile = GoogleTile.for_latitude_longitude(0, 0, z) | |
bbox = tile.bbox | |
tile.shape | |
res_deg = GoogleTile.get_resolution_from_bbox_and_shape(bbox, (256, 256)) | |
res_m[z] = res_deg * 111139 | |
return res_m | |
def _get_zoom_level(self): | |
dx = abs(self.bbox[2] - self.bbox[0]) | |
dy = abs(self.bbox[3] - self.bbox[1]) | |
bbox_size_m = max(dx, dy) * 111139 | |
for z, res in self._zoom_res.items(): | |
n_pixels_in_bbox_at_res = bbox_size_m / res | |
if n_pixels_in_bbox_at_res > self.max_pixels: | |
return z | |
def get_tiles(self): | |
zoom = 19 if self.zoom is None else self.zoom | |
west, south, east, north = self.bbox | |
ll = GoogleTile.for_latitude_longitude(south, west, zoom) | |
ur = GoogleTile.for_latitude_longitude(north, east, zoom) | |
x0, x1 = min(ll.x, ur.x), max(ll.x, ur.x) | |
y0, y1 = min(ll.y, ur.y), max(ll.y, ur.y) | |
x_range = range(x0, x1 + 1) | |
y_range = range(y0, y1 + 1) | |
self.n_xtiles = len(x_range) | |
self.n_ytiles = len(y_range) | |
tiles = [] | |
for x in x_range: | |
for y in y_range: | |
tile = GoogleTile.from_google(x, y, zoom) | |
tile.set_layer(self.map_layer) | |
tiles += tile, | |
return tiles | |
def _parallel_tile_download(self, tiles, n_jobs=12): | |
import joblib | |
func = joblib.delayed(lambda tile: tile.xr) | |
tasks = [func(tile) for tile in tiles] | |
_ = joblib.Parallel(n_jobs=n_jobs, verbose=0, backend='threading')(tasks) | |
def _get_dataarray(self): | |
n_tiles = len(self.tiles) | |
n_jobs = min(24, n_tiles) | |
logger.info(f"Downloading {n_tiles} tiles with {n_jobs} jobs") | |
self._parallel_tile_download(self.tiles, n_jobs=n_jobs) | |
da_list = [tile.xr for tile in self.tiles] | |
logger.info("Combining tiles with xarray.combine_by_coords") | |
da = xr.combine_by_coords(da_list) | |
da = da.sortby("y").sortby("x") | |
da = da.sel( | |
x=slice(self.bbox[0], self.bbox[2]), | |
y=slice(self.bbox[1], self.bbox[3])) | |
return da | |
def _repr_html_(self): | |
if self._xr is None: | |
pass | |
else: | |
self.plot() | |
def __repr__(self): | |
n_tiles = len(self.tiles) | |
if self._xr is None: | |
tile_shape = self.tiles[0].shape | |
y, x = self.n_ytiles * tile_shape[1], self.n_xtiles * tile_shape[0] | |
xy_str = f"[x<={x}, y<={y}]" | |
else: | |
y, x = self.xr.y.size, self.xr.x.size | |
xy_str = f"[x={x}, y={y}]" | |
return f"GoogleScene(bbox={self.bbox}, shape={xy_str}, zoom={self.zoom}, n_tiles={n_tiles})" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment