Created
June 13, 2010 22:21
-
-
Save dound/437051 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import urllib2, urllib | |
class RecaptchaResponse(object): | |
def __init__(self, is_valid, error_code=None): | |
self.is_valid = is_valid | |
self.error_code = error_code | |
def submit (recaptcha_challenge_field, | |
recaptcha_response_field, | |
private_key, | |
remoteip): | |
""" | |
Submits a reCAPTCHA request for verification. Returns RecaptchaResponse | |
for the request | |
recaptcha_challenge_field -- The value of recaptcha_challenge_field from the form | |
recaptcha_response_field -- The value of recaptcha_response_field from the form | |
private_key -- your reCAPTCHA private key | |
remoteip -- the user's ip address | |
""" | |
if not (recaptcha_response_field and recaptcha_challenge_field and | |
len (recaptcha_response_field) and len (recaptcha_challenge_field)): | |
return RecaptchaResponse (is_valid = False, error_code = 'incorrect-captcha-sol') | |
def encode_if_necessary(s): | |
if isinstance(s, unicode): | |
return s.encode('utf-8') | |
return s | |
params = urllib.urlencode ({ | |
'privatekey': encode_if_necessary(private_key), | |
'remoteip' : encode_if_necessary(remoteip), | |
'challenge': encode_if_necessary(recaptcha_challenge_field), | |
'response' : encode_if_necessary(recaptcha_response_field), | |
}) | |
request = urllib2.Request ( | |
url = "http://api-verify.recaptcha.net/verify", | |
data = params, | |
headers = { | |
"Content-type": "application/x-www-form-urlencoded", | |
"User-agent": "reCAPTCHA Python" | |
} | |
) | |
httpresp = urllib2.urlopen (request) | |
return_values = httpresp.read ().splitlines (); | |
httpresp.close(); | |
return_code = return_values [0] | |
if (return_code == "true"): | |
return RecaptchaResponse (is_valid=True) | |
else: | |
return RecaptchaResponse (is_valid=False, error_code = return_values [1]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from google.appengine.ext import webapp | |
from gaesessions import get_current_session | |
from rate_limit import RateLimiter, RL_DROP, RL_HANDLE_BUT_SEND_CAPTCHA | |
OP_TYPE = 'example' | |
SECS_BETWEEN_REQUESTS = 2.0 | |
MAX_BURST_SIZE = 3 # number of requests which can be made faster than SECS_BETWEEN_REQUESTS (like "tokens" for a token bucket) | |
EXAMPLE_RL = RateLimiter(OP_TYPE, SECS_BETWEEN_REQUESTS, MAX_BURST_SIZE) | |
class Example(webapp.RequestHandler): | |
def post(self): | |
# make sure the user is logged in and get their unique ID | |
sess = get_current_session() | |
if not sess.has_key('user_id'): | |
return self.error(400) | |
uid = sess['user_id'] | |
# rate limit this example to guard against bots / malicious users | |
rl = EXAMPLE_RL.rate_limit(uid) | |
if rl == RL_DROP: | |
logging.warn("RL_DROP: %s %s" % (OP_TYPE, uid)) | |
self.response.out.write('captcha-show') # example: indicate to an AJAX caller than a captcha should be shown | |
return # drop this request | |
elif rl == RL_HANDLE_BUT_SEND_CAPTCHA: | |
logging.info("RL_HANDLE_BUT_SEND_CAPTCHA: %s attempt by %s" % (OP_TYPE, uid)) | |
self.response.out.write('captcha-show') | |
# normal processing code ... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from time import time | |
from google.appengine.api import memcache | |
from google.appengine.ext import webapp | |
import captcha | |
from gaesessions import get_current_session | |
RL_HANDLE_NORMALLY = 1 | |
RL_HANDLE_BUT_SEND_CAPTCHA = 2 | |
RL_DROP = -1 | |
def make_mckey(op_type, uid): | |
return "rl-%s-%s" % (op_type, uid) | |
def note_captcha_solved(op_type, uid): | |
memcache.delete(make_mckey(op_type, uid)) | |
class RateLimiter(object): | |
def __init__(self, op_type, secs_per_op, max_tokens, send_captcha_token_thresh=1): | |
"""Initialize a rate-limiter. | |
``op_type`` - a unique identifier of the operation being rate limited (used for part of the memcache key). | |
``secs_per_op`` - minimum time required between operations | |
``max_tokens`` - maximum number of operations which can be done beyond the base rate | |
``send_captcha_token_thresh`` - when we reach this number of tokens, a captcha will be requested. Setting this greater than zero gives the front-end a chance to make another request(s) before answering a captcha. | |
""" | |
self.op_type = op_type | |
self.secs_per_op = float(secs_per_op) | |
self.max_tokens = int(max_tokens) | |
self.send_captcha_token_thresh = int(send_captcha_token_thresh) | |
if self.max_tokens < 0: | |
raise ValueError('max_tokens must be at least 0') | |
if self.send_captcha_token_thresh < 0: | |
raise ValueError('send_captcha_token_thresh must be at least 0') | |
def captcha_solved(self, uid): | |
note_captcha_solved(self.op_type, uid) | |
def rate_limit(self, uid, captcha_solved=False): | |
"""Returns RL_HANDLE_NORMALLY if the request should be handled normally. | |
Returns RL_HANDLE_BUT_SEND_CAPTCHA if the request should be handled AND a captcha should be issued. | |
Returns RL_DROP if the request should be dropped because an outstanding captcha challenge has not been solved. | |
``uid`` - unique identifier for the current user | |
``captcha_solved`` - if True, the rate limiter will be reset for this user and operation type. | |
""" | |
mckey = make_mckey(self.op_type, uid) | |
if captcha_solved: | |
state = None # treat the request as a new one since the user is human | |
else: | |
state = memcache.get(mckey) | |
if not state: | |
prev_time, tokens_left = 0, self.max_tokens | |
else: | |
prev_time, tokens_left = state | |
ret = RL_HANDLE_NORMALLY | |
now = time() | |
if prev_time + self.secs_per_op > now: | |
# request was made more quickly than we allow: deduct a token | |
tokens_left -= 1 | |
if tokens_left < 0: | |
ret = RL_DROP | |
elif tokens_left <= self.send_captcha_token_thresh: | |
ret = RL_HANDLE_BUT_SEND_CAPTCHA | |
else: | |
extra_time = now - prev_time - self.secs_per_op | |
tokens_regained = int(extra_time / self.secs_per_op) | |
tokens_left = min(self.max_tokens, tokens_left+tokens_regained) | |
if ret != RL_DROP: | |
timeout = (self.max_tokens+1) * self.secs_per_op | |
memcache.set(mckey, (now, tokens_left), timeout) | |
return ret | |
class CaptchaHandler(webapp.RequestHandler): | |
def post(self, op_type): | |
self.response.headers['Content-Type'] = 'text/plain' | |
session = get_current_session() | |
if not session.is_active() or not session.has_key('my_id'): | |
return self.response.out.write('captcha-not-logged-in') | |
uid = session['my_id'] | |
challenge = self.request.get('recaptcha_challenge_field') | |
response = self.request.get('recaptcha_response_field') | |
if not challenge or not response: | |
return self.response.out.write('captcha-bad-response') | |
resp = captcha.submit(challenge, response, '6LdFE7oSAAAAAPuHb_bHlp4i6omCQkPlWySQjShD', self.request.remote_addr) | |
if resp.is_valid: | |
note_captcha_solved(op_type, uid) | |
return self.response.out.write('captcha-ok') | |
else: | |
return self.response.out.write('captcha-failed-%s' % resp.error_code) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
More information on to use this code is posted on my blog.