Skip to content

Instantly share code, notes, and snippets.

@dersteppenwolf
Forked from javisantana/named_maps.py
Last active August 29, 2015 14:19
Show Gist options
  • Save dersteppenwolf/282252a9179ba86cc4b6 to your computer and use it in GitHub Desktop.
Save dersteppenwolf/282252a9179ba86cc4b6 to your computer and use it in GitHub Desktop.
import requests
import json
import random
import urllib
class MapInstance(object):
def __init__(self, definition, url, headers=None):
self.url = url
self.definition = definition
self.headers = headers
def attribute(self, layer, feature_id, token=None):
url = [
self.url,
"api", "v1", "map",
self.layergroupid(),
layer,
'attributes',
feature_id
]
url = '/'.join(map(str, url))
if token:
url += '?auth_token=' + token
r = requests.get(url)
if r.status_code != 200:
raise ForbiddenException(r.status_code, r.content)
return r.json()
def tile(self, z, x, y, layer=None, format='png', token=None, cache=True):
tile = [
self.url,
"api", "v1", "map",
self.layergroupid(),
]
if layer != None:
tile.append(layer)
tile += [ z, x, str(y) + "." + format ]
url = '/'.join(map(str, tile))
if token:
url += '?auth_token=' + token
if not cache:
if token:
url += "&"
else:
url += "?"
url += "t=" + str(random.random())
r = requests.get(url)
if r.status_code != 200:
raise ForbiddenException(r.status_code, r.content)
return r
def layergroupid(self):
#return self.definition['layergroupid'].split('@')[-1]
return self.definition['layergroupid']
def last_updated(self):
return self.definition['last_updated']
def __str__(self):
return str(self.definition)
class ForbiddenException(Exception):
def __init__(self, status_code, info):
self.status_code = status_code
self.info = info
def __str__(self):
return repr(self.status_code)
class NamedMap(object):
def __init__(self, name, tmpl, parent):
self.name = name
self.tmpl = tmpl
self.parent = parent
def template(self):
return self.tmpl
def delete(self):
return requests.delete(self.url() + '?api_key=' + self.parent.api_key)
def update():
r = requests.put(self.url + '?api_key=' + self.parent.api_key,
data=json.dumps(self.tmpl),
headers=self.parent.headers)
return self
def url(self):
return self.parent.url + "/" + self.name
def instance(self, token=None, *args, **kwargs):
ins_url = self.url ()
if token:
ins_url += "?auth_token=" + token
r = requests.post(ins_url,
data=json.dumps(kwargs),
headers=self.parent.headers)
if r.status_code != 200:
raise ForbiddenException(r.status_code, r.content)
return MapInstance(r.json(), self.parent.host)
def __str__(self):
return str(self.tmpl)
class AnonymousMaps(object):
def __init__(self, url, tmpl, api_key=None):
self.headers = {'Content-Type': 'application/json'}
self.host = url
self.url = url + '/api/v1/map'
self.api_key = api_key
self.tmpl = tmpl
def apiKey(self, k):
self.api_key = k
return self
def instance(self):
url = self.url
if self.api_key:
url += '?api_key=' + self.api_key
r = requests.post(url,
data=json.dumps(self.tmpl),
headers=self.headers)
if r.status_code == 200:
return MapInstance(r.json(), self.host)
raise ForbiddenException(r.status_code, r.json())
def instance_get(self, *args, **kwargs):
ins_url = self.url
ins_url += '?' + urllib.urlencode({ "config": json.dumps(self.tmpl) })
print ins_url
r = requests.get(ins_url,
headers=self.headers)
if r.status_code != 200:
raise ForbiddenException(r.status_code, r.json())
return MapInstance(r.json(), self.host, r.headers)
class NamedMaps(object):
def __init__(self, url, api_key):
self.headers = {'content-type': 'application/json'}
self.host = url
self.url = url + '/api/v1/map/named'
self.api_key = api_key
def create(self, tmpl):
r = requests.post(self.url + '?api_key=' + self.api_key,
data=json.dumps(tmpl),
headers=self.headers)
if r.status_code == 200:
return NamedMap(r.json()['template_id'], r.json(), self)
return None
def all(self):
r = requests.get(self.url + '?api_key=' + self.api_key, headers=self.headers)
return r.json()
def get(self, name):
r = requests.get(self.url + "/" + name + '?api_key=' + self.api_key, headers=self.headers)
if r.status_code == 200:
return NamedMap(name, r.json(), self)
return None
# this allows test named maps in an server
# the server should have a table named world_borders
import unittest
import sys
import copy
import random
from cartodb import CartoDBOAuth as CartoDB, CartoDBException, CartoDBAPIKey
from named_maps import *
template = {
'version': '0.0.1',
'name': 'testing3',
'auth': {
'method': 'open',
},
'layergroup': {
"version": "1.0.1",
"layers": [{
"type": "cartodb",
"options": {
"cartocss_version": "2.1.1",
"cartocss": "#layer{polygon-fill: #000;line-color: red; }",
"sql": "select * from tm_world_borders_simpl_0_8",
"interactivity": "cartodb_id",
"attributes": { "id": "cartodb_id", "columns": ["iso3", "cartodb_id"] }
}
},
{
"type": "torque",
"options": {
"cartocss_version": "2.1.1",
"cartocss": ' Map{ -torque-time-attribute: "cartodb_id";-torque-aggregation-function: "count(cartodb_id)";-torque-frame-count: 760;-torque-resolution: 2}',
"sql": "select cartodb_id,the_geom, st_centroid(the_geom_webmercator) as the_geom_webmercator from tm_world_borders_simpl_0_8"
#"sql": "select * from tm_world_borders_simpl_0_8"
}
}
]
}
}
API_KEY = None
url = None
#API_KEY = '8bcf70997dc1d2acd7520638e226271d6589e579'
#url = 'http://dev.localhost.lan:8181'
class MapsAPITest(unittest.TestCase):
def setUp(self):
self.maps = NamedMaps(url, API_KEY)
self.template = copy.deepcopy(template)
self.created_maps = []
def tearDown(self):
for x in self.created_maps:
x.delete()
def random_name(self):
return 'map_test_' + str(random.randint(0, sys.maxint))
def create_test_map(self, name=None):
if not name:
name = self.random_name()
self.template['name'] = name
m1 = self.maps.create(self.template)
self.created_maps.append(m1)
return m1
def test_create(self):
m1 = self.create_test_map()
self.assertTrue(self.template['name'] in m1.name)
def test_instance(self):
m1 = self.create_test_map()
ins = m1.instance()#'auth_token2', color='#FFF')
self.assertTrue(len(ins.layergroupid()) > 10)
def test_tile(self):
m1 = self.create_test_map()
ins = m1.instance('auth_token2', color='#FFF')
r = ins.tile(0, 0, 0)
f = r.content
self.assertIsPng(f)
r = ins.tile(0, 0, 0, layer=1, format="json.torque")
f = r.json()
self.assertTrue(len(f) > 0)
self.assertTrue('x__uint8' in f[0])
def test_list(self):
old = self.maps.all()['template_ids']
name1 = self.random_name()
name2 = self.random_name()
m1 = self.create_test_map(name1)
m2 = self.create_test_map(name2)
m = self.maps.all()['template_ids']
# remove user part
old = set([x.split('@')[1] for x in old])
m = set([x.split('@')[1] for x in m])
diff = m.difference(old)
self.assertEqual(len(diff), 2)
self.assertEqual(True, name1 in diff)
self.assertEqual(True, name2 in diff)
def assertIsPng(self, f):
self.assertEqual(f[1] + f[2] + f[3], 'PNG')
def test_create_auth(self):
self.template['auth']['method'] = 'token'
self.template['auth']['valid_tokens'] = ['at1', 'at2']
m1 = self.create_test_map()
try:
m1.instance('wrong_token', color='#FFF')
self.assertTrue(False)
except ForbiddenException as e:
self.assertEqual(403, e.status_code)
ins = m1.instance('at1')
self.assertTrue(ins.layergroupid() != None)
print m1.name
try:
r = ins.tile(0, 0, 0)
self.assertTrue(False)
except ForbiddenException as e:
self.assertEqual(403, e.status_code)
r = ins.tile(0, 0, 0, token='at2')
f = r.content
self.assertIsPng(f)
def test_attributes(self):
self.template['auth']['method'] = 'token'
self.template['auth']['valid_tokens'] = ['at1', 'at2']
m1 = self.create_test_map()
ins = m1.instance('at1')
try:
a = ins.attribute(0, 1)
self.assertTrue(False)
except ForbiddenException as e:
self.assertEqual(403, e.status_code)
a = ins.attribute(0, 1, 'at1')
self.assertTrue(a != None)
self.assertTrue(a['cartodb_id'] == 1)
class AnonymousMapsAPITest(unittest.TestCase):
def setUp(self):
self.template = copy.deepcopy(template['layergroup'])
self.maps = AnonymousMaps(url, self.template, API_KEY)
self.created_maps = []
def test_create_private(self):
i = self.maps.instance()
self.assertTrue(i.layergroupid() != None)
def test_create_private_no_api_key(self):
self.maps.apiKey(None)
try:
i = self.maps.instance()
self.assertTrue(False)
except ForbiddenException as e:
self.assertEqual(403, e.status_code)
def test_create_public(self):
self.maps.apiKey(None)
self.template['layers'][0]['options']['sql'] = "select * from tm_world_borders_simpl_0_8_public"
self.template['layers'][1]['options']['sql'] = "select * from tm_world_borders_simpl_0_8_public",
self.maps = AnonymousMaps(url, self.template, API_KEY)
i = self.maps.instance()
self.assertTrue(i.layergroupid() != None)
def test_attributes(self):
self.maps.apiKey(None)
self.template['layers'][0]['options']['sql'] = "select * from tm_world_borders_simpl_0_8_public"
self.template['layers'][1]['options']['sql'] = "select * from tm_world_borders_simpl_0_8_public",
self.maps = AnonymousMaps(url, self.template, API_KEY)
i = self.maps.instance()
self.assertTrue(i.layergroupid() != None)
a = i.attribute(0, 1, 'at1')
self.assertTrue(a != None)
self.assertTrue(a['cartodb_id'] == 1)
class LayergroupCachingTest(unittest.TestCase):
def setUp(self):
self.template = copy.deepcopy(template['layergroup'])
self.template['layers'][0]['options']['sql'] = "select * from kartones2.tm_world_borders_simpl_0_8_public"
self.template['layers'][1]['options']['sql'] = "select * from kartones2.tm_world_borders_simpl_0_8_public"
self.maps = AnonymousMaps(url, self.template)
self.created_maps = []
self.user = url.split('.')[0].split('//')[1]
def test_create(self):
self.client = CartoDBAPIKey(API_KEY, self.user, '.'.join(url.split('.')[1:]))
i = self.maps.instance_get()
ii = self.maps.instance_get()
self.assertTrue(ii.headers['x-cache'] == "HIT")
self.assertTrue(i.last_updated() == ii.last_updated())
data = self.client.sql('update tm_world_borders_simpl_0_8_public set area = %d where cartodb_id = 1' % random.randint(0, 100))
o = self.maps.instance_get()
self.assertTrue(o.headers['x-cache'] == "MISS")
self.assertTrue(i.last_updated() != o.last_updated())
if __name__ == '__main__':
if len(sys.argv) < 3:
print "usage: python named_maps.py host api_key"
sys.exit()
url = sys.argv[1]
API_KEY = sys.argv[2]
sys.argv = sys.argv[:2]
print url, API_KEY
import logging
requests_log = logging.getLogger("requests")
requests_log.setLevel(logging.DEBUG)
alltests = unittest.TestSuite([
unittest.TestLoader().loadTestsFromTestCase(x)
for x in (MapsAPITest, AnonymousMapsAPITest, LayergroupCachingTest)])
unittest.TextTestRunner(verbosity=2).run(alltests)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment