Skip to content

Instantly share code, notes, and snippets.

@dound
Created June 13, 2010 22:21
Show Gist options
  • Save dound/437051 to your computer and use it in GitHub Desktop.
Save dound/437051 to your computer and use it in GitHub Desktop.
import urllib2, urllib
class RecaptchaResponse(object):
def __init__(self, is_valid, error_code=None):
self.is_valid = is_valid
self.error_code = error_code
def submit (recaptcha_challenge_field,
recaptcha_response_field,
private_key,
remoteip):
"""
Submits a reCAPTCHA request for verification. Returns RecaptchaResponse
for the request
recaptcha_challenge_field -- The value of recaptcha_challenge_field from the form
recaptcha_response_field -- The value of recaptcha_response_field from the form
private_key -- your reCAPTCHA private key
remoteip -- the user's ip address
"""
if not (recaptcha_response_field and recaptcha_challenge_field and
len (recaptcha_response_field) and len (recaptcha_challenge_field)):
return RecaptchaResponse (is_valid = False, error_code = 'incorrect-captcha-sol')
def encode_if_necessary(s):
if isinstance(s, unicode):
return s.encode('utf-8')
return s
params = urllib.urlencode ({
'privatekey': encode_if_necessary(private_key),
'remoteip' : encode_if_necessary(remoteip),
'challenge': encode_if_necessary(recaptcha_challenge_field),
'response' : encode_if_necessary(recaptcha_response_field),
})
request = urllib2.Request (
url = "http://api-verify.recaptcha.net/verify",
data = params,
headers = {
"Content-type": "application/x-www-form-urlencoded",
"User-agent": "reCAPTCHA Python"
}
)
httpresp = urllib2.urlopen (request)
return_values = httpresp.read ().splitlines ();
httpresp.close();
return_code = return_values [0]
if (return_code == "true"):
return RecaptchaResponse (is_valid=True)
else:
return RecaptchaResponse (is_valid=False, error_code = return_values [1])
import logging
from google.appengine.ext import webapp
from gaesessions import get_current_session
from rate_limit import RateLimiter, RL_DROP, RL_HANDLE_BUT_SEND_CAPTCHA
OP_TYPE = 'example'
SECS_BETWEEN_REQUESTS = 2.0
MAX_BURST_SIZE = 3 # number of requests which can be made faster than SECS_BETWEEN_REQUESTS (like "tokens" for a token bucket)
EXAMPLE_RL = RateLimiter(OP_TYPE, SECS_BETWEEN_REQUESTS, MAX_BURST_SIZE)
class Example(webapp.RequestHandler):
def post(self):
# make sure the user is logged in and get their unique ID
sess = get_current_session()
if not sess.has_key('user_id'):
return self.error(400)
uid = sess['user_id']
# rate limit this example to guard against bots / malicious users
rl = EXAMPLE_RL.rate_limit(uid)
if rl == RL_DROP:
logging.warn("RL_DROP: %s %s" % (OP_TYPE, uid))
self.response.out.write('captcha-show') # example: indicate to an AJAX caller than a captcha should be shown
return # drop this request
elif rl == RL_HANDLE_BUT_SEND_CAPTCHA:
logging.info("RL_HANDLE_BUT_SEND_CAPTCHA: %s attempt by %s" % (OP_TYPE, uid))
self.response.out.write('captcha-show')
# normal processing code ...
from time import time
from google.appengine.api import memcache
from google.appengine.ext import webapp
import captcha
from gaesessions import get_current_session
RL_HANDLE_NORMALLY = 1
RL_HANDLE_BUT_SEND_CAPTCHA = 2
RL_DROP = -1
def make_mckey(op_type, uid):
return "rl-%s-%s" % (op_type, uid)
def note_captcha_solved(op_type, uid):
memcache.delete(make_mckey(op_type, uid))
class RateLimiter(object):
def __init__(self, op_type, secs_per_op, max_tokens, send_captcha_token_thresh=1):
"""Initialize a rate-limiter.
``op_type`` - a unique identifier of the operation being rate limited (used for part of the memcache key).
``secs_per_op`` - minimum time required between operations
``max_tokens`` - maximum number of operations which can be done beyond the base rate
``send_captcha_token_thresh`` - when we reach this number of tokens, a captcha will be requested. Setting this greater than zero gives the front-end a chance to make another request(s) before answering a captcha.
"""
self.op_type = op_type
self.secs_per_op = float(secs_per_op)
self.max_tokens = int(max_tokens)
self.send_captcha_token_thresh = int(send_captcha_token_thresh)
if self.max_tokens < 0:
raise ValueError('max_tokens must be at least 0')
if self.send_captcha_token_thresh < 0:
raise ValueError('send_captcha_token_thresh must be at least 0')
def captcha_solved(self, uid):
note_captcha_solved(self.op_type, uid)
def rate_limit(self, uid, captcha_solved=False):
"""Returns RL_HANDLE_NORMALLY if the request should be handled normally.
Returns RL_HANDLE_BUT_SEND_CAPTCHA if the request should be handled AND a captcha should be issued.
Returns RL_DROP if the request should be dropped because an outstanding captcha challenge has not been solved.
``uid`` - unique identifier for the current user
``captcha_solved`` - if True, the rate limiter will be reset for this user and operation type.
"""
mckey = make_mckey(self.op_type, uid)
if captcha_solved:
state = None # treat the request as a new one since the user is human
else:
state = memcache.get(mckey)
if not state:
prev_time, tokens_left = 0, self.max_tokens
else:
prev_time, tokens_left = state
ret = RL_HANDLE_NORMALLY
now = time()
if prev_time + self.secs_per_op > now:
# request was made more quickly than we allow: deduct a token
tokens_left -= 1
if tokens_left < 0:
ret = RL_DROP
elif tokens_left <= self.send_captcha_token_thresh:
ret = RL_HANDLE_BUT_SEND_CAPTCHA
else:
extra_time = now - prev_time - self.secs_per_op
tokens_regained = int(extra_time / self.secs_per_op)
tokens_left = min(self.max_tokens, tokens_left+tokens_regained)
if ret != RL_DROP:
timeout = (self.max_tokens+1) * self.secs_per_op
memcache.set(mckey, (now, tokens_left), timeout)
return ret
class CaptchaHandler(webapp.RequestHandler):
def post(self, op_type):
self.response.headers['Content-Type'] = 'text/plain'
session = get_current_session()
if not session.is_active() or not session.has_key('my_id'):
return self.response.out.write('captcha-not-logged-in')
uid = session['my_id']
challenge = self.request.get('recaptcha_challenge_field')
response = self.request.get('recaptcha_response_field')
if not challenge or not response:
return self.response.out.write('captcha-bad-response')
resp = captcha.submit(challenge, response, '6LdFE7oSAAAAAPuHb_bHlp4i6omCQkPlWySQjShD', self.request.remote_addr)
if resp.is_valid:
note_captcha_solved(op_type, uid)
return self.response.out.write('captcha-ok')
else:
return self.response.out.write('captcha-failed-%s' % resp.error_code)
@dound
Copy link
Author

dound commented Jun 13, 2010

More information on to use this code is posted on my blog.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment