Skip to content

Instantly share code, notes, and snippets.

@limboinf
Created June 29, 2016 15:24
Show Gist options
  • Save limboinf/ce7c9e648357887839b25e2b0557daa3 to your computer and use it in GitHub Desktop.
Save limboinf/ce7c9e648357887839b25e2b0557daa3 to your computer and use it in GitHub Desktop.
api请求和验证。
# coding=utf-8
"""
Api请求验证
:copyright: (c) 2016 by fangpeng(@beginman.cn).
:license: MIT, see LICENSE for more details.
"""
import uuid
import base64
import struct
import time
from Crypto.Cipher import AES
from binascii import b2a_hex, a2b_hex
# AES key must be either 16(AES-128), 24(AES-192), or 32(AES-256) bytes long
__key__ = "_**!qwasxLingzhixunTeamApp@2016_" # AES key
__crypto_uid_key__ = 0x54c4da0
def gen_sign(api_key, security_key, timestarp, url):
"""生成sign"""
msg = api_key + security_key + str(timestarp) + url
return Prpcrypt().encrypt(msg)
class ClientRequest(object):
"""客户端请求"""
def __init__(self, user_id):
self.user_id = user_id
self.timestarp = int(time.time()) # 一次请求时间戳要保证唯一
def request(self, url, json_data):
token = 'token' # 每次login后服务器生成 token (token <==> uid) 返回给客户端
security_key = self.gen_sid() # 每次请求要生成唯一的
# 生成签名
sign = gen_sign(token, security_key, self.timestarp, url)
req_data = {
'token': token,
'security_key': security_key,
'timestarp': self.timestarp,
'sign': sign
}
req_data.update(json_data)
return req_data
def gen_sid(self):
try:
user_id = int(self.user_id)
except:
return ''
st_uuid = str(uuid.uuid4()).replace('-', '').upper() # 32bytes
st_time = self.timestarp
st_uid = user_id ^ __crypto_uid_key__
key = struct.pack('32sIL', st_uuid, st_time, st_uid)
return base64.b64encode(key)
class APiServiceHandlerRequest(object):
"""服务端api请求基类"""
def __init__(self):
self.user_id = None
def valid_request(self, url, req_data):
# req_data json 数据解析
# 其他处理..
self.valid_req_field(req_data) # 验证请求字段是否齐全
self.valid_timestarp(req_data['timestarp']) # 验证请求时间戳
self.valid_sign(req_data, url) # 验证sign签名
self.valid_security_key(req_data['security_key'],
req_data['timestarp'])
self.valid_token(req_data['token']) # 验证token
print (u'请求验证成功, user_id:%s' % self.user_id)
def valid_req_field(self, req_data, *args):
"""验证请求字段完整性"""
check_fields = args
if not check_fields:
check_fields=('token', 'security_key', 'timestarp', 'sign')
check = map(lambda x: x in req_data, check_fields)
if all(check) is False:
dic = dict(zip(check_fields, check))
missing_fields = ','.join([k for k, v in dic.items() if v is False])
raise ValueError('Missing fields %s' % missing_fields)
def valid_timestarp(self, timestarp, expires=10*60):
"""判断服务器接到请求的时间
服务器时间和参数中的时间戳是否相差很长一段时间(时间间隔自定义如10min)
如果超过则说明该url已经过期
"""
now = int(time.time())
timestarp = int(timestarp)
if now - timestarp > expires:
raise ValueError("The request is out of date.")
def valid_token(self, token):
"""判断token是否有效
根据请求过来的token,查询redis缓存中的uid,
如果获取不到这说明该token已过期。
"""
redis = {self.user_id: 'token'} # 假设是redis服务
if redis.get(self.user_id) != token:
raise ValueError("token不存在")
def valid_security_key(self, security_key, timestarp):
"""通过security_key解析出user_id 和timestarp
1.判断解析出来的timestarp和请求的timestarp值是否相同, 不同则是伪造请求
2.判断user_id是否合法
"""
result = self.get_uid(security_key)
uid = result['uid']
if timestarp != result['timestarp']:
raise ValueError("The request is out of date.")
if uid == 0:
raise ValueError("无效的请求")
self.user_id = uid
def valid_sign(self, req_data, url):
sign = req_data['sign']
token = req_data['token']
security_key = req_data['security_key']
timestarp = req_data['timestarp']
if sign != gen_sign(token, security_key, timestarp, url):
raise ValueError('Sign error.')
def get_uid(self, sid):
try:
b_sid = base64.b64decode(sid)
except:
return 0
if len(b_sid) != 48:
return 0
_, _time, uid = struct.unpack('32sIL', b_sid)
return {"uid": uid ^ __crypto_uid_key__, 'timestarp': _time}
class Prpcrypt():
def __init__(self, key=__key__):
self.key = key
self.mode = AES.MODE_CBC
def encrypt(self,text):
"""加密函数,
如果text不足16位就用空格补足为16位,
如果大于16且不是16的倍数,那就补足为16的倍数。
"""
cryptor = AES.new(self.key, self.mode, b'0000000000000000')
length = 16
count = len(text)
if count < length:
add = (length-count)
elif count > length:
add = (length-(count % length))
text += ('\0' * add) # \0 backspace
self.ciphertext = cryptor.encrypt(text)
# 因为AES加密时候得到的字符串不一定是ascii字符集的,输出到终端或者保存时候可能存在问题
# 所以这里统一把加密后的字符串转化为16进制字符串
return b2a_hex(self.ciphertext)
def decrypt(self, text):
"""解密函数
解密后去掉补足的空格用strip() 去掉"""
cryptor = AES.new(self.key, self.mode, b'0000000000000000')
plain_text = cryptor.decrypt(a2b_hex(text))
return plain_text.rstrip('\0')
if __name__ == '__main__':
url = "http://beginman.cn/"
# client 正常请求
client = ClientRequest(381710)
req_data = client.request(url, {'user': 'beginman', 'sex': 'male'})
print req_data
# 服务端处理请求
api = APiServiceHandlerRequest()
api.valid_request(url, req_data)
# 伪造请求
req_data = {'token': 'token',
'timestarp': 1467196568,
'user': 'beginman',
'security_key': 'sKey',
'sex': 'male',
'sign': '66ac115febaac233568f93bc71452c77446618d3de41f2ae3f4798006e404531cd7eb5af344ee8ff36c14ad168f53b9a'}
api.valid_request(url, req_data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment