Last active
September 7, 2017 09:03
-
-
Save hustlzp/011be36d7bd950f21c17 to your computer and use it in GitHub Desktop.
Flask permission control
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# 用于view的装饰器 | |
from ..utils.permissions import VisitorPermission | |
@bp.route('/signin', methods=['GET', 'POST']) | |
@VisitorPermission() | |
def signin(): | |
"""登陆""" | |
form = SigninForm() | |
if form.validate_on_submit(): | |
signin_user(form.user) | |
return redirect(url_for('site.index')) | |
return render_template('account/signin.html', form=form) | |
# 用于view代码中 | |
from ..utils.permissions import QuestionAdminPermission | |
@bp.route('/question/<int:uid>/delete') | |
def delete_question(uid): | |
"""删除问题""" | |
question = Question.query.get_or_404(uid) | |
permission = QuestionAdminPermission(uid) | |
if not permission.check(): | |
return permission.deny() | |
db.session.delete(question) | |
db.session.commit() | |
return redirect(url_for('site.index')) | |
# 用于Jinja2中 | |
def register_jinja(app): | |
"""注册Jinja2全局变量""" | |
from .utils import permissions | |
# inject vars into template context | |
@app.context_processor | |
def inject_vars(): | |
return dict( | |
g_permissions=permissions, | |
) | |
""" | |
{% if g_permissions.QuestionAdminPermission(question.id).check() %} | |
<a class="btn btn-default confirm" data-confirm="确认删除此问题?" | |
href="{{ url_for('qa.delete_question', uid=question.id) }}"> | |
<span class="fa fa-trash-o"></span> 删除 | |
</a> | |
{% endif %} | |
""" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
from functools import wraps | |
class Permission(object): | |
def __call__(self, func): | |
"""提供view装饰器能力""" | |
@wraps(func) | |
def decorator(*args, **kwargs): | |
if not self.check(): | |
return self.deny() | |
return func(*args, **kwargs) | |
return decorator | |
def check(self): | |
"""运行规则""" | |
result, self.deny = self.rule.run() | |
return result | |
def show(self): | |
"""显示self.rule的规则结构,调试用""" | |
self.rule.show() | |
class Rule(object): | |
def __init__(self): | |
self.rules_list = [[(self.check, self.deny)]] | |
# 若子类实现了base方法,则将其返回的rule实例的rules_list串联到self.rules_list上游 | |
base_rule = self.base() | |
if base_rule: | |
self.rules_list = Rule._and(base_rule.rules_list, self.rules_list) | |
def __and__(self, other): | |
"""逻辑与操作(&) | |
将other.rules_list串联到self.rules_list的下游, | |
并返回当前实例。""" | |
self.rules_list = Rule._and(self.rules_list, other.rules_list) | |
return self | |
def __or__(self, other): | |
"""逻辑或操作(|) | |
将self.rules_list与other.rules_list并联起来, | |
并返回当前实例""" | |
for rule in other.rules_list: | |
self.rules_list.append(rule) | |
return self | |
def show(self): | |
"""显示rules_list的结构""" | |
for rule in self.rules_list: | |
result = ", ".join([check.__repr__() for check, deny in rule]) | |
print(result) | |
def base(self): | |
"""提供rule规则继承能力(串联)""" | |
return None | |
def run(self): | |
"""运行rules_list。 | |
若某一通道check通过,则返回成功状态 | |
若所有通道都无法check通过,则返回失败状态(包含最后运行失败的rule的deny方法)""" | |
failed_result = None | |
for rule in self.rules_list: | |
for check, deny in rule: | |
if not check(): | |
failed_result = (False, deny) | |
break | |
else: | |
return (True, None) | |
return failed_result | |
def check(self): | |
"""当前rule的测试方法,强制子类overload""" | |
raise NotImplementedError() | |
def deny(self): | |
"""当前rule测试失败后需要执行的方法,强制子类overload""" | |
raise NotImplementedError() | |
@staticmethod | |
def _and(rules_list_pre, rules_list_pro): | |
"""将rule_list_pre串联到rule_list_pro上游""" | |
return [rule_pre + rule_pro | |
for rule_pre in rules_list_pre | |
for rule_pro in rules_list_pro] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
from .flask_permission import Permission | |
from .rules import VisitorRule, UserRule, AdminRule, QuestionOwnerRule | |
class VisitorPermission(Permission): | |
"""游客许可""" | |
def __init__(self): | |
self.rule = VisitorRule() | |
class UserPermission(Permission): | |
"""注册用户许可""" | |
def __init__(self): | |
self.rule = UserRule() | |
class AdminPermission(Permission): | |
"""管理员许可""" | |
def __init__(self): | |
self.rule = AdminRule() | |
class QuestionAdminPermission(Permission): | |
"""Question管理许可 | |
Question拥有者/管理员均有该许可 | |
""" | |
def __init__(self, question_id): | |
self.rule = AdminRule() | QuestionOwnerRule(question_id) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
from flask import redirect, url_for, abort, flash, g | |
from ..models import Question | |
from .flask_permission import Rule | |
class VisitorRule(Rule): | |
"""游客""" | |
def check(self): | |
return not g.user | |
def deny(self): | |
return redirect(url_for('site.index')) | |
class UserRule(Rule): | |
"""注册用户""" | |
def check(self): | |
return g.user | |
def deny(self): | |
flash('此操作需要登录账户') | |
return redirect(url_for('account.signin')) | |
class AdminRule(Rule): | |
"""管理员""" | |
def base(self): | |
return UserRule() | |
def check(self): | |
return g.user.is_admin | |
def deny(self): | |
abort(403) | |
class QuestionOwnerRule(Rule): | |
"""问题拥有者""" | |
def __init__(self, question_id): | |
self.question_id = question_id | |
Rule.__init__(self) | |
def base(self): | |
return UserRule() | |
def check(self): | |
question = Question.query.filter(Question.id == self.question_id).first() | |
return question and question.user_id == g.user.id | |
def deny(self): | |
abort(403) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment