Skip to content

Instantly share code, notes, and snippets.

@doug
Created April 19, 2013 23:18
Show Gist options
  • Save doug/5423903 to your computer and use it in GitHub Desktop.
Save doug/5423903 to your computer and use it in GitHub Desktop.
gae utils
from google.appengine.ext import db
from google.appengine.api import memcache, users
import logging
import functools
import datetime, time
import sys, os, re
sys.path.append(os.path.dirname(os.path.realpath(__file__)))
IP_LIMIT = 10 # number of times
IP_THROTTLE_TIME = 60 # time in seconds
IP_THROTTLE_NS = 'ipthrottle'
EMAIL_RE = re.compile('[a-zA-Z0-9._%-]+@[a-zA-Z0-9._%-]+.[a-zA-Z]{2,6}$')
def ipthrottle(limit=IP_LIMIT, time=IP_THROTTLE_TIME, keySuffix=None):
"""Decorate with this method to throttle request per ip."""
def _ipthrottle(method):
@functools.wraps(method)
def wrapper(self, *args, **kwargs):
name = method.__name__
ip = self.request.remote_addr
if keySuffix:
NS = "%s|%s"%(IP_THROTTLE_NS, keySuffix)
else:
NS = IP_THROTTLE_NS
key = "%s:%s"%(name,ip)
hits = memcache.get(key, namespace=NS)
if not hits:
hits = 1
memcache.set(key, hits, namespace=NS, time=time)
else:
if hits >= limit:
#TODO: send email about absusive IP
logging.debug("IP THROTTLED: %s"%(ip,))
return
else:
memcache.incr(key, namespace=NS)
return method(self, *args, **kwargs)
return wrapper
return _ipthrottle
def administrator(method):
"""Decorate with this method to restrict to site admins."""
@functools.wraps(method)
def wrapper(self, *args, **kwargs):
user = users.get_current_user()
if not user:
if self.request.method == "GET":
self.redirect(users.create_login_url())
return
raise self.response.set_status(403)
elif not users.is_current_user_admin():
if self.request.method == "GET":
self.redirect("/")
return
raise self.response.set_status(403)
else:
return method(self, *args, **kwargs)
return wrapper
def jsonp(method):
@functools.wraps(method)
def wrapper(self, *args, **kwargs):
callback = self.request.get('callback')
if callback:
self.response.out.write('%s('%(callback,))
method(self, *args, **kwargs)
self.response.out.write(');')
self.response.headers['Content-Type'] = 'text/javascript'
else:
method(self, *args, **kwargs)
def valid_email(email):
return EMAIL_RE.match(email) is not None
SIMPLE_TYPES = (int, long, float, bool, dict, basestring, list)
def to_dict(model,only=None,exclude=list(),with_id=True):
output = {}
iterator = only
if iterator is None or not isinstance(iterator, list):
iterator = model.properties().iterkeys()
for key in iterator:
if key in exclude:
continue
value = getattr(model, key)
if value is None or isinstance(value, SIMPLE_TYPES):
output[key] = value
elif isinstance(value, datetime.date):
# Convert date/datetime to ms-since-epoch ("new Date()").
ms = time.mktime(value.utctimetuple()) * 1000
ms += getattr(value, 'microseconds', 0) / 1000
output[key] = int(ms)
elif isinstance(value, db.GeoPt):
output[key] = {'lat': value.lat, 'lon': value.lon}
elif isinstance(value, blobstore.BlobInfo):
output[key] = to_dict(str(value))
elif isinstance(value, db.Model):
output[key] = to_dict(value)
else:
raise ValueError('cannot encode ' + repr(value))
if with_id:
output["id"] = model.key().id()
return output
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment