Skip to content

Instantly share code, notes, and snippets.

@recall704
Last active September 26, 2016 05:44
Show Gist options
  • Save recall704/9e81a284011d6d903b9cbce716599f7a to your computer and use it in GitHub Desktop.
Save recall704/9e81a284011d6d903b9cbce716599f7a to your computer and use it in GitHub Desktop.
def permission_required(*permission_list):
def _wraps(func):
@tornado.gen.coroutine
def __wraps(*args, **kwargs):
# 1. 检测 token 是否存在
handle = args[0] # tornado.web.RequestHander 函数的 self
token = handle.request.headers.get('X-Auth-Token', "")
if token is None or token.strip("") == "":
handle.set_status(status_code=401)
response_body = {
"message": "token 不存在",
"code": "401",
"response": {"error": "token 不存在"},
"success": False
}
handle.write(response_body)
return
# 2. token 换取 permission list
keystone_url = 'http://172.24.6.155:8000/auth/tokens'
req_header = {
'X-Auth-Token': token,
}
client = AsyncHTTPClient()
response = yield client.fetch(keystone_url, method="GET", body=None, headers=req_header)
try:
body = json.loads(response.body)
permissions = body.get("response", {}).get("token", {}).get("permissions", [])
is_admin = body.get("response", {}).get("token", {}).get("user", {}).get("is_admin", False)
p_list = []
for item in permissions:
tmp_name = item.get("service_name", "") + '.' + item.get("name", "")
p_list.append(tmp_name)
# 取交集
intersection = list(set(permission_list).intersection(p_list))
if (intersection == permission_list) or is_admin:
# 满足权限需求
pass
else:
d = {
"message": "无权限",
"code": "403",
"response": "无权限",
"success": False
}
handle.set_status(status_code=403)
handle.write(d)
return
except:
d = {
"message": "服务器出错",
"code": "500",
"response": "服务器出错",
"success": False
}
handle.set_status(status_code=500)
handle.write(d)
return
ret = func(*args, **kwargs)
raise tornado.gen.Return(ret)
return __wraps
return _wraps
@recall704
Copy link
Author

test

class TestHandler(BaseHandler):

    @permission_required('app.restart', 'app.start')
    @tornado.gen.coroutine
    def get(self):
        self.write('test')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment