Created
April 19, 2013 23:18
-
-
Save doug/5423903 to your computer and use it in GitHub Desktop.
gae utils
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from google.appengine.ext import db | |
from google.appengine.api import memcache, users | |
import logging | |
import functools | |
import datetime, time | |
import sys, os, re | |
sys.path.append(os.path.dirname(os.path.realpath(__file__))) | |
IP_LIMIT = 10 # number of times | |
IP_THROTTLE_TIME = 60 # time in seconds | |
IP_THROTTLE_NS = 'ipthrottle' | |
EMAIL_RE = re.compile('[a-zA-Z0-9._%-]+@[a-zA-Z0-9._%-]+.[a-zA-Z]{2,6}$') | |
def ipthrottle(limit=IP_LIMIT, time=IP_THROTTLE_TIME, keySuffix=None): | |
"""Decorate with this method to throttle request per ip.""" | |
def _ipthrottle(method): | |
@functools.wraps(method) | |
def wrapper(self, *args, **kwargs): | |
name = method.__name__ | |
ip = self.request.remote_addr | |
if keySuffix: | |
NS = "%s|%s"%(IP_THROTTLE_NS, keySuffix) | |
else: | |
NS = IP_THROTTLE_NS | |
key = "%s:%s"%(name,ip) | |
hits = memcache.get(key, namespace=NS) | |
if not hits: | |
hits = 1 | |
memcache.set(key, hits, namespace=NS, time=time) | |
else: | |
if hits >= limit: | |
#TODO: send email about absusive IP | |
logging.debug("IP THROTTLED: %s"%(ip,)) | |
return | |
else: | |
memcache.incr(key, namespace=NS) | |
return method(self, *args, **kwargs) | |
return wrapper | |
return _ipthrottle | |
def administrator(method): | |
"""Decorate with this method to restrict to site admins.""" | |
@functools.wraps(method) | |
def wrapper(self, *args, **kwargs): | |
user = users.get_current_user() | |
if not user: | |
if self.request.method == "GET": | |
self.redirect(users.create_login_url()) | |
return | |
raise self.response.set_status(403) | |
elif not users.is_current_user_admin(): | |
if self.request.method == "GET": | |
self.redirect("/") | |
return | |
raise self.response.set_status(403) | |
else: | |
return method(self, *args, **kwargs) | |
return wrapper | |
def jsonp(method): | |
@functools.wraps(method) | |
def wrapper(self, *args, **kwargs): | |
callback = self.request.get('callback') | |
if callback: | |
self.response.out.write('%s('%(callback,)) | |
method(self, *args, **kwargs) | |
self.response.out.write(');') | |
self.response.headers['Content-Type'] = 'text/javascript' | |
else: | |
method(self, *args, **kwargs) | |
def valid_email(email): | |
return EMAIL_RE.match(email) is not None | |
SIMPLE_TYPES = (int, long, float, bool, dict, basestring, list) | |
def to_dict(model,only=None,exclude=list(),with_id=True): | |
output = {} | |
iterator = only | |
if iterator is None or not isinstance(iterator, list): | |
iterator = model.properties().iterkeys() | |
for key in iterator: | |
if key in exclude: | |
continue | |
value = getattr(model, key) | |
if value is None or isinstance(value, SIMPLE_TYPES): | |
output[key] = value | |
elif isinstance(value, datetime.date): | |
# Convert date/datetime to ms-since-epoch ("new Date()"). | |
ms = time.mktime(value.utctimetuple()) * 1000 | |
ms += getattr(value, 'microseconds', 0) / 1000 | |
output[key] = int(ms) | |
elif isinstance(value, db.GeoPt): | |
output[key] = {'lat': value.lat, 'lon': value.lon} | |
elif isinstance(value, blobstore.BlobInfo): | |
output[key] = to_dict(str(value)) | |
elif isinstance(value, db.Model): | |
output[key] = to_dict(value) | |
else: | |
raise ValueError('cannot encode ' + repr(value)) | |
if with_id: | |
output["id"] = model.key().id() | |
return output |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment