Created
January 13, 2017 14:47
-
-
Save stuartlynn/dc6f987a0516efa3a8f24069c235cb4b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pandas as pd | |
| from urllib import urlencode | |
| from IPython.display import IFrame | |
| from urllib import urlencode | |
| from collections import OrderedDict | |
| import matplotlib | |
| import matplotlib.pyplot as plt | |
| import matplotlib.animation as animation | |
| import urllib2 | |
| import requests | |
| import json | |
| from pylab import * | |
| class Carto: | |
| def __init__(self,username,key=None): | |
| self.username = username | |
| self.key = key | |
| def cdb_map(self,viz_json, width=700, height=700): | |
| return IFrame('https://team.cartodb.com/u/{username}/viz/{viz}/embed_map'.format(viz=viz_json, username=self.username), | |
| width=700, | |
| height=700) | |
| def cdb_query(self, query, username='stuartlynn'): | |
| url = 'https://{username}.carto.com/api/v2/sql?'.format(username=self.username) | |
| q = {'q': query, 'api_key': self.key, 'format': 'csv'} | |
| return pd.read_csv(url+urlencode(q)) | |
| class StaticImage: | |
| def __init__(self,carto_tools): | |
| self.account = carto_tools.username | |
| self.key = carto_tools.key | |
| self.layers = OrderedDict() | |
| def set_css_for_layer(self,css,layer_name): | |
| if layer_name in self.layers: | |
| self.layers[layer_name]['options']['css'] = sql | |
| else: | |
| create_mapnix_layer('',css,layer_name) | |
| def set_sql_for_layer(self,sql,layer_name): | |
| self.sql = sql | |
| if layer_name in self.layers: | |
| self.layers[layer_name]['options']['sql'] = sql | |
| else: | |
| create_mapnix_layer(sql,'',layer_name) | |
| def create_mapnik_layer(self,sql,css, name): | |
| layer={ | |
| "type": "mapnik", | |
| "options": { | |
| "cartocss_version": "2.1.1", | |
| "cartocss": css, | |
| "sql": sql, | |
| } | |
| } | |
| self.layers[name] = layer | |
| def add_simple_layer(self,url, color, image): | |
| self.layers['base_layer']={ | |
| "type" : "plain", | |
| "options" : { | |
| } | |
| } | |
| if(color): | |
| self.layers['base_layer']['options']['color']=color | |
| if(image): | |
| self.layers['base_layer']['options'][image] = image | |
| return self.layers['base_layer'] | |
| def add_base_layer(self,url): | |
| self.layers['base_layer'] = { | |
| "type": "http", | |
| "options": { | |
| "urlTemplate": url, | |
| "subdomains": [ | |
| "a", | |
| "b", | |
| "c" | |
| ] | |
| } | |
| } | |
| def construct_query(self): | |
| regular_layers = [layer for name, layer in self.layers.iteritems() if name != 'base_layer'] | |
| layers = [] | |
| if 'base_layer' in self.layers: | |
| layers.append(self.layers['base_layer']) | |
| for layer in regular_layers: | |
| layers.append(layer) | |
| query = { | |
| "version": "1.3.0", | |
| "layers" : layers | |
| } | |
| return query | |
| def map_url(self): | |
| return "http://{user}.cartodb.com/api/v1/map?key={key}".format( | |
| user = self.account, | |
| key = self.key | |
| ) | |
| def static_map_url(self, token,width,height,bounds): | |
| return "http://{user}.cartodb.com/api/v1/map/static/bbox/{token}/{bbox}/{width}/{height}.png".format( | |
| user = self.account, | |
| token = token, | |
| width = width, | |
| height = height, | |
| bbox = bounds, | |
| key = self.key) | |
| def getMap(self): | |
| query = self.construct_query() | |
| result = requests.post(self.map_url(), data = json.dumps(query), headers={'content-type': 'application/json'}) | |
| r= result.json() | |
| return r['layergroupid'] | |
| def getStaticImage(self, width,height,bounds): | |
| token = self.getMap() | |
| return self.static_map_url(token,width,height,bounds) | |
| class MapAnimHelper(object): | |
| def __init__(self, urls, titles=None): | |
| self.fig = plt.figure() | |
| self.titles = titles | |
| self.ax = self.fig.add_subplot(111) | |
| self.ax.set_aspect('equal') | |
| self.ax.get_xaxis().set_visible(False) | |
| self.ax.get_yaxis().set_visible(False) | |
| self.fig.set_size_inches([10,10]) | |
| self.animation_frames = [ plt.imread(urllib2.urlopen(url))[:,:,:-1] for url in urls ] | |
| def init(self): | |
| self.im = self.ax.imshow(self.animation_frames[0]) | |
| return self.im | |
| def __call__(self,n): | |
| self.im.set_data(self.animation_frames[n]) | |
| if self.titles: | |
| self.ax.set_title(str(self.titles[n]), size=30, color='black') | |
| return self.im | |
| class AnimatedMap(object): | |
| def __init__(self,carto_tools): | |
| self.carto_tools = carto_tools | |
| self.layers = OrderedDict() | |
| def add_base_layer(self,url): | |
| self.layers['base_layer'] = url | |
| def add_layer(self,name,sql,css,frame_parameters): | |
| self.layers[name] = { | |
| 'sql' : sql, | |
| 'css' : css, | |
| 'frame_parameters' : frame_parameters | |
| } | |
| def set_titles(self,titles): | |
| self.titles = titles | |
| def generate_frame(self,frame,bounds,width,height): | |
| static = StaticImage(self.carto_tools) | |
| for name,layer in self.layers.iteritems(): | |
| if name=='base_layer': | |
| static.add_base_layer(layer) | |
| else: | |
| sql = layer['sql'].format(**layer['frame_parameters'][frame]) | |
| css = layer['css'] #.format(**layer['frame_parameters'][frame]) | |
| static.create_mapnik_layer(sql,css,name) | |
| return static.getStaticImage(width,height,bounds) | |
| def generate_frames(self,bounds,width,height): | |
| no_frames = np.max([ len(l['frame_parameters']) for name, l in self.layers.iteritems()]) | |
| frames = [self.generate_frame(frame,bounds,width,height) | |
| for frame in range(no_frames)] | |
| return frames | |
| def generate_animation(self,out_file, bounds, width, height, duration, titles=None): | |
| frames = self.generate_frames(bounds,width, height) | |
| helper = MapAnimHelper(frames,titles) | |
| dpi = 200 | |
| ani = animation.FuncAnimation( | |
| helper.fig, | |
| helper, | |
| frames=np.arange(len(frames)), | |
| interval=duration / len(frames), | |
| init_func=helper.init | |
| ) | |
| writer = animation.writers['ffmpeg'](fps=1) | |
| ani.save(out_file,writer=writer,dpi=dpi) | |
| return out_file | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pandas as pd | |
| from urllib import urlencode | |
| from IPython.display import IFrame | |
| from urllib import urlencode | |
| from collections import OrderedDict | |
| import matplotlib | |
| import matplotlib.pyplot as plt | |
| import matplotlib.animation as animation | |
| import urllib2 | |
| import requests | |
| import json | |
| from pylab import * | |
| class Carto: | |
| def __init__(self,username,key=None): | |
| self.username = username | |
| self.key = key | |
| def cdb_map(self,viz_json, width=700, height=700): | |
| return IFrame('https://team.cartodb.com/u/{username}/viz/{viz}/embed_map'.format(viz=viz_json, username=self.username), | |
| width=700, | |
| height=700) | |
| def cdb_query(self, query, username='stuartlynn'): | |
| url = 'https://{username}.carto.com/api/v2/sql?'.format(username=self.username) | |
| q = {'q': query, 'api_key': self.key, 'format': 'csv'} | |
| return pd.read_csv(url+urlencode(q)) | |
| class StaticImage: | |
| def __init__(self,carto_tools): | |
| self.account = carto_tools.username | |
| self.key = carto_tools.key | |
| self.layers = OrderedDict() | |
| def set_css_for_layer(self,css,layer_name): | |
| if layer_name in self.layers: | |
| self.layers[layer_name]['options']['css'] = sql | |
| else: | |
| create_mapnix_layer('',css,layer_name) | |
| def set_sql_for_layer(self,sql,layer_name): | |
| self.sql = sql | |
| if layer_name in self.layers: | |
| self.layers[layer_name]['options']['sql'] = sql | |
| else: | |
| create_mapnix_layer(sql,'',layer_name) | |
| def create_mapnik_layer(self,sql,css, name): | |
| layer={ | |
| "type": "mapnik", | |
| "options": { | |
| "cartocss_version": "2.1.1", | |
| "cartocss": css, | |
| "sql": sql, | |
| } | |
| } | |
| self.layers[name] = layer | |
| def add_simple_layer(self,url, color, image): | |
| self.layers['base_layer']={ | |
| "type" : "plain", | |
| "options" : { | |
| } | |
| } | |
| if(color): | |
| self.layers['base_layer']['options']['color']=color | |
| if(image): | |
| self.layers['base_layer']['options'][image] = image | |
| return self.layers['base_layer'] | |
| def add_base_layer(self,url): | |
| self.layers['base_layer'] = { | |
| "type": "http", | |
| "options": { | |
| "urlTemplate": url, | |
| "subdomains": [ | |
| "a", | |
| "b", | |
| "c" | |
| ] | |
| } | |
| } | |
| def construct_query(self): | |
| regular_layers = [layer for name, layer in self.layers.iteritems() if name != 'base_layer'] | |
| layers = [] | |
| if 'base_layer' in self.layers: | |
| layers.append(self.layers['base_layer']) | |
| for layer in regular_layers: | |
| layers.append(layer) | |
| query = { | |
| "version": "1.3.0", | |
| "layers" : layers | |
| } | |
| return query | |
| def map_url(self): | |
| return "http://{user}.cartodb.com/api/v1/map?key={key}".format( | |
| user = self.account, | |
| key = self.key | |
| ) | |
| def static_map_url(self, token,width,height,bounds): | |
| return "http://{user}.cartodb.com/api/v1/map/static/bbox/{token}/{bbox}/{width}/{height}.png".format( | |
| user = self.account, | |
| token = token, | |
| width = width, | |
| height = height, | |
| bbox = bounds, | |
| key = self.key) | |
| def getMap(self): | |
| query = self.construct_query() | |
| result = requests.post(self.map_url(), data = json.dumps(query), headers={'content-type': 'application/json'}) | |
| r= result.json() | |
| return r['layergroupid'] | |
| def getStaticImage(self, width,height,bounds): | |
| token = self.getMap() | |
| return self.static_map_url(token,width,height,bounds) | |
| class MapAnimHelper(object): | |
| def __init__(self, urls, titles=None): | |
| self.fig = plt.figure() | |
| self.titles = titles | |
| self.ax = self.fig.add_subplot(111) | |
| self.ax.set_aspect('equal') | |
| self.ax.get_xaxis().set_visible(False) | |
| self.ax.get_yaxis().set_visible(False) | |
| self.fig.set_size_inches([10,10]) | |
| self.animation_frames = [ plt.imread(urllib2.urlopen(url))[:,:,:-1] for url in urls ] | |
| def init(self): | |
| self.im = self.ax.imshow(self.animation_frames[0]) | |
| return self.im | |
| def __call__(self,n): | |
| self.im.set_data(self.animation_frames[n]) | |
| if self.titles: | |
| self.ax.set_title(str(self.titles[n]), size=30, color='black') | |
| return self.im | |
| class AnimatedMap(object): | |
| def __init__(self,carto_tools): | |
| self.carto_tools = carto_tools | |
| self.layers = OrderedDict() | |
| def add_base_layer(self,url): | |
| self.layers['base_layer'] = url | |
| def add_layer(self,name,sql,css,frame_parameters): | |
| self.layers[name] = { | |
| 'sql' : sql, | |
| 'css' : css, | |
| 'frame_parameters' : frame_parameters | |
| } | |
| def set_titles(self,titles): | |
| self.titles = titles | |
| def generate_frame(self,frame,bounds,width,height): | |
| static = StaticImage(self.carto_tools) | |
| for name,layer in self.layers.iteritems(): | |
| if name=='base_layer': | |
| static.add_base_layer(layer) | |
| else: | |
| sql = layer['sql'].format(**layer['frame_parameters'][frame]) | |
| css = layer['css'] #.format(**layer['frame_parameters'][frame]) | |
| static.create_mapnik_layer(sql,css,name) | |
| return static.getStaticImage(width,height,bounds) | |
| def generate_frames(self,bounds,width,height): | |
| no_frames = np.max([ len(l['frame_parameters']) for name, l in self.layers.iteritems()]) | |
| frames = [self.generate_frame(frame,bounds,width,height) | |
| for frame in range(no_frames)] | |
| return frames | |
| def generate_animation(self,out_file, bounds, width, height, duration, titles=None): | |
| frames = self.generate_frames(bounds,width, height) | |
| helper = MapAnimHelper(frames,titles) | |
| dpi = 200 | |
| ani = animation.FuncAnimation( | |
| helper.fig, | |
| helper, | |
| frames=np.arange(len(frames)), | |
| interval=duration / len(frames), | |
| init_func=helper.init | |
| ) | |
| writer = animation.writers['ffmpeg'](fps=1) | |
| ani.save(out_file,writer=writer,dpi=dpi) | |
| return out_file | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment