Skip to content

Instantly share code, notes, and snippets.

@swshan
Created June 17, 2016 07:49
Show Gist options
  • Save swshan/27e71db4f9fcf16c182e22cea4b95329 to your computer and use it in GitHub Desktop.
Save swshan/27e71db4f9fcf16c182e22cea4b95329 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# encoding: utf-8
import os
import time
import logging
import hashlib
from sqlalchemy import create_engine, ForeignKey, Column, Integer, String, Text, DateTime,\
and_, or_, SmallInteger, Float, DECIMAL, desc, asc, Table, join, event
from sqlalchemy.orm import relationship, backref, sessionmaker, scoped_session, aliased, mapper
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
from sqlalchemy.orm.collections import attribute_mapped_collection
from flask import Flask
from flask import request, jsonify, make_response
from flask import flash, session
import redis
redis_store = redis.Redis(host='localhost', db=4)
engine = create_engine('mysql://root:password@localhost:3306/user', echo=True)
Base = declarative_base()
Session = sessionmaker(bind=engine)
session = Session()
app = Flask(__name__)
from datetime import timedelta
from flask import make_response, request, current_app
from functools import update_wrapper
def crossdomain(origin=None, methods=None, headers=None,
max_age=21600, attach_to_all=True,
automatic_options=True):
if methods is not None:
methods = ', '.join(sorted(x.upper() for x in methods))
if headers is not None and not isinstance(headers, basestring):
headers = ', '.join(x.upper() for x in headers)
if not isinstance(origin, basestring):
origin = ', '.join(origin)
if isinstance(max_age, timedelta):
max_age = max_age.total_seconds()
def get_methods():
if methods is not None:
return methods
options_resp = current_app.make_default_options_response()
return options_resp.headers['allow']
def decorator(f):
def wrapped_function(*args, **kwargs):
if automatic_options and request.method == 'OPTIONS':
resp = current_app.make_default_options_response()
else:
resp = make_response(f(*args, **kwargs))
if not attach_to_all and request.method != 'OPTIONS':
return resp
h = resp.headers
h['Access-Control-Allow-Origin'] = origin
h['Access-Control-Allow-Methods'] = get_methods()
h['Access-Control-Max-Age'] = str(max_age)
if headers is not None:
h['Access-Control-Allow-Headers'] = headers
return resp
f.provide_automatic_options = False
return update_wrapper(wrapped_function, f)
return decorator
def allow_origin(f):
@wraps(f)
def wrapped(*args, **kwargs):
rv = f(*args, **kwargs)
response = make_response(rv)
response.headers['Access-Control-Allow-Origin'] = '192.168.1.66'
return response
return wrapped
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key = True)
email = Column(String(32), index = True)
password = Column(String(128))
def __init__(self, email, password):
self.email = email
#self.password = password
self.password = hashlib.sha1(password).hexdigest()
def __repr__(self):
return "User('%s', '%s')" % \
(self.email, self.password)
@app.errorhandler(404)
def not_found(error):
return make_response(jsonify({'error': '404 Not found'}), 404)
# 验证token合法性以及是否过期
from functools import wraps
def token_required(func):
pass
# User Login
@app.route('/public/api/user/login/', methods=['GET','POST','OPTIONS'])
def login():
email = request.get_json(force=True)['email']
password = request.get_json(force=True)['password']
print 'diag'
print email
print password
#print dir(request)
user = session.query(User).filter_by(email=email).first()
userpassword_hash = hashlib.sha1(password).hexdigest()
if not user:
return jsonify({'code': 1, 'message': 'user doe not exist'})
if user.password != userpassword_hash:
return jsonify({'code': 1, 'message': 'password error'})
# 就用md5函数
# a session identifier, session ID or session token
get_md5 = hashlib.md5()
get_md5.update(email)
get_md5.update(password)
get_md5.update(str(int(time.time())))
user_token = get_md5.hexdigest()
# 就用md5函数
# a session identifier, session ID or session token
get_md5 = hashlib.md5()
get_md5.update(email)
get_md5.update(password)
get_md5.update(str(int(time.time())))
user_token = get_md5.hexdigest()
# storage purpose
redis_store.hmset('user:%s' % email, {'token': user_token, 'email': email })
redis_store.set('token:%s' % user_token, email)
redis_store.expire('token:%s' % user_token, 3600*24*30 )
response = jsonify({"ok": "true", "data": {"user_id": user.id, "session_id": user_token}})
response.headers.add('Access-Control-Allow-Origin', '*')
return response
@app.route('/api/user/logout')
def logout():
pass
# User Registration
@app.route('/public/api/user/register/', methods = ['POST'])
@crossdomain(origin='*')
def new_user():
email = request.args.get("email", "")
password = request.args.get("password", "")
# if username is None or password is None:
# abort(400)
# if session.query(User).filter_by(username='username').first() is not None:
# abort(400) # existing user
content = User(email=email, password=password)
session.add(content)
try:
session.commit()
except:
session.rollback()
raise
return jsonify({ 'email': email, 'password': password }), 201,
# 登錄限制的資源
@app.route('/api/resource')
def get_resource():
return jsonify({ 'data': 'Hello, %s!' % g.username })
# welcome msg
@app.route('/')
def welcome():
return jsonify({ 'data': 'Hello '})
if __name__ == '__main__':
Base.metadata.create_all(engine)
###
logging.getLogger('flask_cors').level = logging.DEBUG
###
app.run(debug=True, host='192.168.1.66', port=8000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment