-
-
Save dersteppenwolf/282252a9179ba86cc4b6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import json | |
import random | |
import urllib | |
class MapInstance(object): | |
def __init__(self, definition, url, headers=None): | |
self.url = url | |
self.definition = definition | |
self.headers = headers | |
def attribute(self, layer, feature_id, token=None): | |
url = [ | |
self.url, | |
"api", "v1", "map", | |
self.layergroupid(), | |
layer, | |
'attributes', | |
feature_id | |
] | |
url = '/'.join(map(str, url)) | |
if token: | |
url += '?auth_token=' + token | |
r = requests.get(url) | |
if r.status_code != 200: | |
raise ForbiddenException(r.status_code, r.content) | |
return r.json() | |
def tile(self, z, x, y, layer=None, format='png', token=None, cache=True): | |
tile = [ | |
self.url, | |
"api", "v1", "map", | |
self.layergroupid(), | |
] | |
if layer != None: | |
tile.append(layer) | |
tile += [ z, x, str(y) + "." + format ] | |
url = '/'.join(map(str, tile)) | |
if token: | |
url += '?auth_token=' + token | |
if not cache: | |
if token: | |
url += "&" | |
else: | |
url += "?" | |
url += "t=" + str(random.random()) | |
r = requests.get(url) | |
if r.status_code != 200: | |
raise ForbiddenException(r.status_code, r.content) | |
return r | |
def layergroupid(self): | |
#return self.definition['layergroupid'].split('@')[-1] | |
return self.definition['layergroupid'] | |
def last_updated(self): | |
return self.definition['last_updated'] | |
def __str__(self): | |
return str(self.definition) | |
class ForbiddenException(Exception): | |
def __init__(self, status_code, info): | |
self.status_code = status_code | |
self.info = info | |
def __str__(self): | |
return repr(self.status_code) | |
class NamedMap(object): | |
def __init__(self, name, tmpl, parent): | |
self.name = name | |
self.tmpl = tmpl | |
self.parent = parent | |
def template(self): | |
return self.tmpl | |
def delete(self): | |
return requests.delete(self.url() + '?api_key=' + self.parent.api_key) | |
def update(): | |
r = requests.put(self.url + '?api_key=' + self.parent.api_key, | |
data=json.dumps(self.tmpl), | |
headers=self.parent.headers) | |
return self | |
def url(self): | |
return self.parent.url + "/" + self.name | |
def instance(self, token=None, *args, **kwargs): | |
ins_url = self.url () | |
if token: | |
ins_url += "?auth_token=" + token | |
r = requests.post(ins_url, | |
data=json.dumps(kwargs), | |
headers=self.parent.headers) | |
if r.status_code != 200: | |
raise ForbiddenException(r.status_code, r.content) | |
return MapInstance(r.json(), self.parent.host) | |
def __str__(self): | |
return str(self.tmpl) | |
class AnonymousMaps(object): | |
def __init__(self, url, tmpl, api_key=None): | |
self.headers = {'Content-Type': 'application/json'} | |
self.host = url | |
self.url = url + '/api/v1/map' | |
self.api_key = api_key | |
self.tmpl = tmpl | |
def apiKey(self, k): | |
self.api_key = k | |
return self | |
def instance(self): | |
url = self.url | |
if self.api_key: | |
url += '?api_key=' + self.api_key | |
r = requests.post(url, | |
data=json.dumps(self.tmpl), | |
headers=self.headers) | |
if r.status_code == 200: | |
return MapInstance(r.json(), self.host) | |
raise ForbiddenException(r.status_code, r.json()) | |
def instance_get(self, *args, **kwargs): | |
ins_url = self.url | |
ins_url += '?' + urllib.urlencode({ "config": json.dumps(self.tmpl) }) | |
print ins_url | |
r = requests.get(ins_url, | |
headers=self.headers) | |
if r.status_code != 200: | |
raise ForbiddenException(r.status_code, r.json()) | |
return MapInstance(r.json(), self.host, r.headers) | |
class NamedMaps(object): | |
def __init__(self, url, api_key): | |
self.headers = {'content-type': 'application/json'} | |
self.host = url | |
self.url = url + '/api/v1/map/named' | |
self.api_key = api_key | |
def create(self, tmpl): | |
r = requests.post(self.url + '?api_key=' + self.api_key, | |
data=json.dumps(tmpl), | |
headers=self.headers) | |
if r.status_code == 200: | |
return NamedMap(r.json()['template_id'], r.json(), self) | |
return None | |
def all(self): | |
r = requests.get(self.url + '?api_key=' + self.api_key, headers=self.headers) | |
return r.json() | |
def get(self, name): | |
r = requests.get(self.url + "/" + name + '?api_key=' + self.api_key, headers=self.headers) | |
if r.status_code == 200: | |
return NamedMap(name, r.json(), self) | |
return None | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# this allows test named maps in an server | |
# the server should have a table named world_borders | |
import unittest | |
import sys | |
import copy | |
import random | |
from cartodb import CartoDBOAuth as CartoDB, CartoDBException, CartoDBAPIKey | |
from named_maps import * | |
template = { | |
'version': '0.0.1', | |
'name': 'testing3', | |
'auth': { | |
'method': 'open', | |
}, | |
'layergroup': { | |
"version": "1.0.1", | |
"layers": [{ | |
"type": "cartodb", | |
"options": { | |
"cartocss_version": "2.1.1", | |
"cartocss": "#layer{polygon-fill: #000;line-color: red; }", | |
"sql": "select * from tm_world_borders_simpl_0_8", | |
"interactivity": "cartodb_id", | |
"attributes": { "id": "cartodb_id", "columns": ["iso3", "cartodb_id"] } | |
} | |
}, | |
{ | |
"type": "torque", | |
"options": { | |
"cartocss_version": "2.1.1", | |
"cartocss": ' Map{ -torque-time-attribute: "cartodb_id";-torque-aggregation-function: "count(cartodb_id)";-torque-frame-count: 760;-torque-resolution: 2}', | |
"sql": "select cartodb_id,the_geom, st_centroid(the_geom_webmercator) as the_geom_webmercator from tm_world_borders_simpl_0_8" | |
#"sql": "select * from tm_world_borders_simpl_0_8" | |
} | |
} | |
] | |
} | |
} | |
API_KEY = None | |
url = None | |
#API_KEY = '8bcf70997dc1d2acd7520638e226271d6589e579' | |
#url = 'http://dev.localhost.lan:8181' | |
class MapsAPITest(unittest.TestCase): | |
def setUp(self): | |
self.maps = NamedMaps(url, API_KEY) | |
self.template = copy.deepcopy(template) | |
self.created_maps = [] | |
def tearDown(self): | |
for x in self.created_maps: | |
x.delete() | |
def random_name(self): | |
return 'map_test_' + str(random.randint(0, sys.maxint)) | |
def create_test_map(self, name=None): | |
if not name: | |
name = self.random_name() | |
self.template['name'] = name | |
m1 = self.maps.create(self.template) | |
self.created_maps.append(m1) | |
return m1 | |
def test_create(self): | |
m1 = self.create_test_map() | |
self.assertTrue(self.template['name'] in m1.name) | |
def test_instance(self): | |
m1 = self.create_test_map() | |
ins = m1.instance()#'auth_token2', color='#FFF') | |
self.assertTrue(len(ins.layergroupid()) > 10) | |
def test_tile(self): | |
m1 = self.create_test_map() | |
ins = m1.instance('auth_token2', color='#FFF') | |
r = ins.tile(0, 0, 0) | |
f = r.content | |
self.assertIsPng(f) | |
r = ins.tile(0, 0, 0, layer=1, format="json.torque") | |
f = r.json() | |
self.assertTrue(len(f) > 0) | |
self.assertTrue('x__uint8' in f[0]) | |
def test_list(self): | |
old = self.maps.all()['template_ids'] | |
name1 = self.random_name() | |
name2 = self.random_name() | |
m1 = self.create_test_map(name1) | |
m2 = self.create_test_map(name2) | |
m = self.maps.all()['template_ids'] | |
# remove user part | |
old = set([x.split('@')[1] for x in old]) | |
m = set([x.split('@')[1] for x in m]) | |
diff = m.difference(old) | |
self.assertEqual(len(diff), 2) | |
self.assertEqual(True, name1 in diff) | |
self.assertEqual(True, name2 in diff) | |
def assertIsPng(self, f): | |
self.assertEqual(f[1] + f[2] + f[3], 'PNG') | |
def test_create_auth(self): | |
self.template['auth']['method'] = 'token' | |
self.template['auth']['valid_tokens'] = ['at1', 'at2'] | |
m1 = self.create_test_map() | |
try: | |
m1.instance('wrong_token', color='#FFF') | |
self.assertTrue(False) | |
except ForbiddenException as e: | |
self.assertEqual(403, e.status_code) | |
ins = m1.instance('at1') | |
self.assertTrue(ins.layergroupid() != None) | |
print m1.name | |
try: | |
r = ins.tile(0, 0, 0) | |
self.assertTrue(False) | |
except ForbiddenException as e: | |
self.assertEqual(403, e.status_code) | |
r = ins.tile(0, 0, 0, token='at2') | |
f = r.content | |
self.assertIsPng(f) | |
def test_attributes(self): | |
self.template['auth']['method'] = 'token' | |
self.template['auth']['valid_tokens'] = ['at1', 'at2'] | |
m1 = self.create_test_map() | |
ins = m1.instance('at1') | |
try: | |
a = ins.attribute(0, 1) | |
self.assertTrue(False) | |
except ForbiddenException as e: | |
self.assertEqual(403, e.status_code) | |
a = ins.attribute(0, 1, 'at1') | |
self.assertTrue(a != None) | |
self.assertTrue(a['cartodb_id'] == 1) | |
class AnonymousMapsAPITest(unittest.TestCase): | |
def setUp(self): | |
self.template = copy.deepcopy(template['layergroup']) | |
self.maps = AnonymousMaps(url, self.template, API_KEY) | |
self.created_maps = [] | |
def test_create_private(self): | |
i = self.maps.instance() | |
self.assertTrue(i.layergroupid() != None) | |
def test_create_private_no_api_key(self): | |
self.maps.apiKey(None) | |
try: | |
i = self.maps.instance() | |
self.assertTrue(False) | |
except ForbiddenException as e: | |
self.assertEqual(403, e.status_code) | |
def test_create_public(self): | |
self.maps.apiKey(None) | |
self.template['layers'][0]['options']['sql'] = "select * from tm_world_borders_simpl_0_8_public" | |
self.template['layers'][1]['options']['sql'] = "select * from tm_world_borders_simpl_0_8_public", | |
self.maps = AnonymousMaps(url, self.template, API_KEY) | |
i = self.maps.instance() | |
self.assertTrue(i.layergroupid() != None) | |
def test_attributes(self): | |
self.maps.apiKey(None) | |
self.template['layers'][0]['options']['sql'] = "select * from tm_world_borders_simpl_0_8_public" | |
self.template['layers'][1]['options']['sql'] = "select * from tm_world_borders_simpl_0_8_public", | |
self.maps = AnonymousMaps(url, self.template, API_KEY) | |
i = self.maps.instance() | |
self.assertTrue(i.layergroupid() != None) | |
a = i.attribute(0, 1, 'at1') | |
self.assertTrue(a != None) | |
self.assertTrue(a['cartodb_id'] == 1) | |
class LayergroupCachingTest(unittest.TestCase): | |
def setUp(self): | |
self.template = copy.deepcopy(template['layergroup']) | |
self.template['layers'][0]['options']['sql'] = "select * from kartones2.tm_world_borders_simpl_0_8_public" | |
self.template['layers'][1]['options']['sql'] = "select * from kartones2.tm_world_borders_simpl_0_8_public" | |
self.maps = AnonymousMaps(url, self.template) | |
self.created_maps = [] | |
self.user = url.split('.')[0].split('//')[1] | |
def test_create(self): | |
self.client = CartoDBAPIKey(API_KEY, self.user, '.'.join(url.split('.')[1:])) | |
i = self.maps.instance_get() | |
ii = self.maps.instance_get() | |
self.assertTrue(ii.headers['x-cache'] == "HIT") | |
self.assertTrue(i.last_updated() == ii.last_updated()) | |
data = self.client.sql('update tm_world_borders_simpl_0_8_public set area = %d where cartodb_id = 1' % random.randint(0, 100)) | |
o = self.maps.instance_get() | |
self.assertTrue(o.headers['x-cache'] == "MISS") | |
self.assertTrue(i.last_updated() != o.last_updated()) | |
if __name__ == '__main__': | |
if len(sys.argv) < 3: | |
print "usage: python named_maps.py host api_key" | |
sys.exit() | |
url = sys.argv[1] | |
API_KEY = sys.argv[2] | |
sys.argv = sys.argv[:2] | |
print url, API_KEY | |
import logging | |
requests_log = logging.getLogger("requests") | |
requests_log.setLevel(logging.DEBUG) | |
alltests = unittest.TestSuite([ | |
unittest.TestLoader().loadTestsFromTestCase(x) | |
for x in (MapsAPITest, AnonymousMapsAPITest, LayergroupCachingTest)]) | |
unittest.TextTestRunner(verbosity=2).run(alltests) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment