Last active
December 30, 2015 01:08
-
-
Save ryonsherman/266da75fa0826c5d4eb7 to your computer and use it in GitHub Desktop.
Example Flask implementation of Redis, MongoDB, Jinja, uWSGI, and Web Sockets.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python2 | |
| import os | |
| import glob | |
| import flask | |
| import logging | |
| import humongolus | |
| from flask import Flask | |
| from redis import StrictRedis | |
| from sider.session import Session | |
| from jinja2 import FileSystemLoader | |
| from flask.ext.pymongo import PyMongo | |
| from pymongo.connection import Connection | |
| from lib.middleware.uWSGI import uWSGI | |
| from lib.middleware.Sockets import Sockets | |
| app = Flask(__name__) | |
| app.jinja_loader = FileSystemLoader(os.path.join(os.path.dirname(__file__), 'views')) | |
| app.debug = True | |
| if app.debug: | |
| from weberror.evalexception import make_eval_exception | |
| app.wsgi_app = make_eval_exception(app.wsgi_app, {}) | |
| else: | |
| from weberror.evalexception import make_error_middleware | |
| app.wsgi_app = make_error_middleware(app.wsgi_app, {}) | |
| app.sockets = Sockets(app) | |
| app.cache = Session(StrictRedis()) | |
| app.db = PyMongo(app) | |
| app.db.orm = humongolus | |
| app.db.orm.settings(logging.getLogger(__name__), Connection()) | |
| routes = glob.glob('{0}/routes/*.py'.format(os.path.dirname(__file__))) | |
| routes = [os.path.basename(f)[:-3] for f in routes] | |
| for route in routes: | |
| __import__('routes.{0}'.format(route), fromlist=route) | |
| app = uWSGI(app) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class SocketMiddleware(object): | |
| def __init__(self, app, socket): | |
| self.app = app | |
| self.socket = socket | |
| def __call__(self, environ, start_response): | |
| route = environ['PATH_INFO'] | |
| if route not in self.socket.routes: | |
| return self.app(environ, start_response) | |
| handler = self.socket.routes[route] | |
| socket = environ.get('wsgi.websocket', None) | |
| if not socket: | |
| raise Exception("IMPLEMENT 400 ERROR!") | |
| handler(socket) | |
| class Sockets(object): | |
| routes = {} | |
| def __init__(self, app): | |
| app.wsgi_app = SocketMiddleware(app.wsgi_app, self) | |
| def route(self, rule, **opts): | |
| def decorator(func): | |
| endpoint = opts.pop('endpoint', None) | |
| self.routes[rule] = func | |
| return decorator |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import uwsgi | |
| import socket | |
| from geventwebsocket.handler import WebSocketHandler | |
| from geventwebsocket.python_fixes import makefile | |
| class WSGIMiddlewareHandler(WebSocketHandler): | |
| def __init__(self, environ, start_response, application): | |
| self.environ = environ | |
| self.socket = socket.fromfd(uwsgi.connection_fd(), socket.AF_INET, socket.SOCK_STREAM) | |
| self.rfile = makefile(self.socket) | |
| self.application = application | |
| self.start_response = start_response | |
| self.request_version = environ['SERVER_PROTOCOL'] | |
| def log_request(self): | |
| pass | |
| def uWSGI(app): | |
| def application(environ, start_response): | |
| header = environ.get('HTTP_UPGRADE', '').lower() | |
| if header != 'websocket': | |
| return app(environ, start_response) | |
| header = environ.get('HTTP_CONNECTION', '').lower() | |
| if 'upgrade' in header: | |
| handler = WSGIMiddlewareHandler(environ, start_response, app) | |
| return handler._handle_websocket() | |
| return application |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment