Created
June 29, 2016 15:24
-
-
Save limboinf/ce7c9e648357887839b25e2b0557daa3 to your computer and use it in GitHub Desktop.
api请求和验证。
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
""" | |
Api请求验证 | |
:copyright: (c) 2016 by fangpeng(@beginman.cn). | |
:license: MIT, see LICENSE for more details. | |
""" | |
import uuid | |
import base64 | |
import struct | |
import time | |
from Crypto.Cipher import AES | |
from binascii import b2a_hex, a2b_hex | |
# AES key must be either 16(AES-128), 24(AES-192), or 32(AES-256) bytes long | |
__key__ = "_**!qwasxLingzhixunTeamApp@2016_" # AES key | |
__crypto_uid_key__ = 0x54c4da0 | |
def gen_sign(api_key, security_key, timestarp, url): | |
"""生成sign""" | |
msg = api_key + security_key + str(timestarp) + url | |
return Prpcrypt().encrypt(msg) | |
class ClientRequest(object): | |
"""客户端请求""" | |
def __init__(self, user_id): | |
self.user_id = user_id | |
self.timestarp = int(time.time()) # 一次请求时间戳要保证唯一 | |
def request(self, url, json_data): | |
token = 'token' # 每次login后服务器生成 token (token <==> uid) 返回给客户端 | |
security_key = self.gen_sid() # 每次请求要生成唯一的 | |
# 生成签名 | |
sign = gen_sign(token, security_key, self.timestarp, url) | |
req_data = { | |
'token': token, | |
'security_key': security_key, | |
'timestarp': self.timestarp, | |
'sign': sign | |
} | |
req_data.update(json_data) | |
return req_data | |
def gen_sid(self): | |
try: | |
user_id = int(self.user_id) | |
except: | |
return '' | |
st_uuid = str(uuid.uuid4()).replace('-', '').upper() # 32bytes | |
st_time = self.timestarp | |
st_uid = user_id ^ __crypto_uid_key__ | |
key = struct.pack('32sIL', st_uuid, st_time, st_uid) | |
return base64.b64encode(key) | |
class APiServiceHandlerRequest(object): | |
"""服务端api请求基类""" | |
def __init__(self): | |
self.user_id = None | |
def valid_request(self, url, req_data): | |
# req_data json 数据解析 | |
# 其他处理.. | |
self.valid_req_field(req_data) # 验证请求字段是否齐全 | |
self.valid_timestarp(req_data['timestarp']) # 验证请求时间戳 | |
self.valid_sign(req_data, url) # 验证sign签名 | |
self.valid_security_key(req_data['security_key'], | |
req_data['timestarp']) | |
self.valid_token(req_data['token']) # 验证token | |
print (u'请求验证成功, user_id:%s' % self.user_id) | |
def valid_req_field(self, req_data, *args): | |
"""验证请求字段完整性""" | |
check_fields = args | |
if not check_fields: | |
check_fields=('token', 'security_key', 'timestarp', 'sign') | |
check = map(lambda x: x in req_data, check_fields) | |
if all(check) is False: | |
dic = dict(zip(check_fields, check)) | |
missing_fields = ','.join([k for k, v in dic.items() if v is False]) | |
raise ValueError('Missing fields %s' % missing_fields) | |
def valid_timestarp(self, timestarp, expires=10*60): | |
"""判断服务器接到请求的时间 | |
服务器时间和参数中的时间戳是否相差很长一段时间(时间间隔自定义如10min) | |
如果超过则说明该url已经过期 | |
""" | |
now = int(time.time()) | |
timestarp = int(timestarp) | |
if now - timestarp > expires: | |
raise ValueError("The request is out of date.") | |
def valid_token(self, token): | |
"""判断token是否有效 | |
根据请求过来的token,查询redis缓存中的uid, | |
如果获取不到这说明该token已过期。 | |
""" | |
redis = {self.user_id: 'token'} # 假设是redis服务 | |
if redis.get(self.user_id) != token: | |
raise ValueError("token不存在") | |
def valid_security_key(self, security_key, timestarp): | |
"""通过security_key解析出user_id 和timestarp | |
1.判断解析出来的timestarp和请求的timestarp值是否相同, 不同则是伪造请求 | |
2.判断user_id是否合法 | |
""" | |
result = self.get_uid(security_key) | |
uid = result['uid'] | |
if timestarp != result['timestarp']: | |
raise ValueError("The request is out of date.") | |
if uid == 0: | |
raise ValueError("无效的请求") | |
self.user_id = uid | |
def valid_sign(self, req_data, url): | |
sign = req_data['sign'] | |
token = req_data['token'] | |
security_key = req_data['security_key'] | |
timestarp = req_data['timestarp'] | |
if sign != gen_sign(token, security_key, timestarp, url): | |
raise ValueError('Sign error.') | |
def get_uid(self, sid): | |
try: | |
b_sid = base64.b64decode(sid) | |
except: | |
return 0 | |
if len(b_sid) != 48: | |
return 0 | |
_, _time, uid = struct.unpack('32sIL', b_sid) | |
return {"uid": uid ^ __crypto_uid_key__, 'timestarp': _time} | |
class Prpcrypt(): | |
def __init__(self, key=__key__): | |
self.key = key | |
self.mode = AES.MODE_CBC | |
def encrypt(self,text): | |
"""加密函数, | |
如果text不足16位就用空格补足为16位, | |
如果大于16且不是16的倍数,那就补足为16的倍数。 | |
""" | |
cryptor = AES.new(self.key, self.mode, b'0000000000000000') | |
length = 16 | |
count = len(text) | |
if count < length: | |
add = (length-count) | |
elif count > length: | |
add = (length-(count % length)) | |
text += ('\0' * add) # \0 backspace | |
self.ciphertext = cryptor.encrypt(text) | |
# 因为AES加密时候得到的字符串不一定是ascii字符集的,输出到终端或者保存时候可能存在问题 | |
# 所以这里统一把加密后的字符串转化为16进制字符串 | |
return b2a_hex(self.ciphertext) | |
def decrypt(self, text): | |
"""解密函数 | |
解密后去掉补足的空格用strip() 去掉""" | |
cryptor = AES.new(self.key, self.mode, b'0000000000000000') | |
plain_text = cryptor.decrypt(a2b_hex(text)) | |
return plain_text.rstrip('\0') | |
if __name__ == '__main__': | |
url = "http://beginman.cn/" | |
# client 正常请求 | |
client = ClientRequest(381710) | |
req_data = client.request(url, {'user': 'beginman', 'sex': 'male'}) | |
print req_data | |
# 服务端处理请求 | |
api = APiServiceHandlerRequest() | |
api.valid_request(url, req_data) | |
# 伪造请求 | |
req_data = {'token': 'token', | |
'timestarp': 1467196568, | |
'user': 'beginman', | |
'security_key': 'sKey', | |
'sex': 'male', | |
'sign': '66ac115febaac233568f93bc71452c77446618d3de41f2ae3f4798006e404531cd7eb5af344ee8ff36c14ad168f53b9a'} | |
api.valid_request(url, req_data) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment