Skip to content

Instantly share code, notes, and snippets.

@jianingy
Last active November 13, 2018 05:44
Show Gist options
  • Save jianingy/deb16c0ba1b54c6ec63e6fcce9808807 to your computer and use it in GitHub Desktop.
Save jianingy/deb16c0ba1b54c6ec63e6fcce9808807 to your computer and use it in GitHub Desktop.
some handy tools for writing python falcon app
# -*- coding: UTF-8 -*-
# author: [email protected]
""" Example:
# -*- coding: UTF-8 -*-
from ..common import patch_io # noqa
from ..common import idol
from ..common.falconkit import create_app
from ..model import database
from . import view
class ResourceReleaseMiddleware():
def process_response(self, req, resp, resource, req_succeeded):
database.close()
middleware = [ResourceReleaseMiddleware()]
idol.rise()
app = application = create_app(module=view, middleware=middleware)
"""
from logzero import logger as LOG
from marshmallow.exceptions import ValidationError
from marshmallow import Schema
import falcon
import functools
import logging
import logzero
import pike.discovery.py as discovery
import traceback
import ujson
import os
logzero.formatter(logging.Formatter('[%(levelname)s] %(message)s'))
def _catch_all(error, req, resp, params):
if not isinstance(error, falcon.HTTPError):
detail = error.message if hasattr(error, 'message') else ''
tb = ''.join(traceback.format_tb(error.__traceback__))
LOG.error('v' * 78)
LOG.error(f'{error} {type(error)} \n{tb}'.strip())
LOG.error('^' * 78)
error = dict(reason=str(error), detail=detail, code=-1)
if bool(os.environ.get('DEBUG', '')):
error['traceback'] = tb
raise falcon.HTTPStatus(falcon.HTTP_INTERNAL_SERVER_ERROR,
body=ujson.dumps(error))
else:
raise error
def _catch_validation(error, req, resp, params):
body = ujson.dumps(dict(reason='VALIDATION_ERROR',
detail=error.args[0],
code=1))
raise falcon.HTTPStatus(falcon.HTTP_BAD_REQUEST, body=body)
def create_app(module, middleware=[]):
app = falcon.API(middleware=middleware, independent_middleware=True)
num_routes = 0
for v in discovery.get_all_classes(module):
if hasattr(v, 'route') and isinstance(v.route, str):
LOG.info(f'Adding route {v.route} from {v.__name__} ...')
app.add_route(v.route, v())
num_routes += 1
LOG.info(f'#{num_routes} routes has been successfully added.')
# add order matters
app.add_error_handler(Exception, _catch_all)
app.add_error_handler(ValidationError, _catch_validation)
return app
def schema(query=None, data=None, reply=None):
query_schema, data_schema = query, data
def _urldecode(schema, formdata):
data = {}
for key, value in formdata.items():
if not key.endswith('[]'):
data[key] = value[-1]
else:
data[key] = value
return schema.load(data).data
def wrapper(function):
@functools.wraps(function)
def f(handler, *args, **kwargs):
req, resp, *_ = args
if query_schema and isinstance(query_schema, Schema):
kwargs.update({'query': _urldecode(query_schema, req.params)})
if data_schema and isinstance(data_schema, Schema):
body = req.stream.read().decode('UTF-8')
ctype = req.content_type or ''
if ctype.startswith('application/json'):
body = body if body else '{}'
kwargs.update({'data': data_schema.loads(body).data})
elif ctype.startswith('application/x-www-form-urlencoded'):
params = falcon.uri.parse_query_string(body)
kwargs.update({'data': _urldecode(data_schema, params)})
else:
kwargs.update({'data': body})
retval = function(handler, *args, **kwargs)
if reply and isinstance(reply, Schema):
resp.content_type = falcon.MEDIA_JSON
if isinstance(retval, tuple) and len(retval) > 1:
code, data = int(retval[0]), reply.dump(retval[1]).data
if len(retval) > 2:
resp.status = retval[2]
else:
code, data = 0, reply.dump(retval).data
resp.body = ujson.dumps(dict(code=code, data=data))
elif isinstance(retval, dict):
resp.content_type = falcon.MEDIA_JSON
resp.body = ujson.dumps(retval)
else:
resp.content_type = falcon.MEDIA_TEXT
resp.body = str(retval)
return retval
return f
return wrapper
# -*- coding: UTF-8 -*-
# author: [email protected]
from gevent import monkey
from gevent.socket import wait_read, wait_write
from psycopg2 import extensions
# Call this function after monkey-patching socket (etc).
def patch_psycopg2():
extensions.set_wait_callback(_psycopg2_gevent_callback)
def _psycopg2_gevent_callback(conn, timeout=None):
while True:
state = conn.poll()
if state == extensions.POLL_OK:
break
elif state == extensions.POLL_READ:
wait_read(conn.fileno(), timeout=timeout)
elif state == extensions.POLL_WRITE:
wait_write(conn.fileno(), timeout=timeout)
else:
raise ValueError('poll() returned unexpected result')
monkey.patch_all()
patch_psycopg2()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment