Created
February 11, 2010 15:19
-
-
Save asplake/301613 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
UPDATED 2010-03-02 - remove duplicated on_get handling, incorporate simplifications from Pylons 1.0b | |
For Pylons 0.97 with asplake's Routes fork with {.format} parameter support | |
1) fill_render(), a render() that encodes repeating groups properly | |
2) A refactored @validate with | |
a) JSON support | |
b) cleaned-up form_errors that render properly in the presence of repeating groups | |
c) some possibility of extensibility | |
3) JSON-related helpers: sent_json(), accepts_json(), render_json() | |
4) formatted_url(), a url() that remembers any format extension on the request | |
5) BaseSchema, a formencode.Schema with sensible defaults | |
See: | |
[1] @validate revisited, JSON support, content negotiation - http://groups.google.com/group/pylons-discuss/browse_thread/thread/927f4367d5367fc2/c0e325c804680bb6?lnk=gst&q=revisited#c0e325c804680bb6 | |
[2] asplake/routes - http://bitbucket.org/asplake/routes/ | |
[3] Experimental {.format} in Routes - http://positiveincline.com/?p=617 | |
""" | |
from decorator import decorator | |
import json | |
import logging | |
import formencode | |
from formencode import htmlfill | |
from formencode import variabledecode | |
from formencode.schema import format_compound_error | |
from pylons import request, response, session, tmpl_context as c, url | |
from pylons.controllers import WSGIController | |
from pylons.controllers.util import abort | |
from pylons.decorators import PylonsFormEncodeState | |
from pylons.templating import render_mako as render | |
from webob.multidict import UnicodeMultiDict | |
log = logging.getLogger(__name__) | |
def fill_render(template_name, values): | |
return htmlfill.render(render(template_name), | |
variabledecode.variable_encode(values)) | |
def url_format_extension(): | |
if 'wsgiorg.routing_args' in request.environ: | |
return request.environ['wsgiorg.routing_args'][1].get('format', None) | |
else: | |
return None | |
def sent_json(): | |
return url_format_extension() == 'json' or \ | |
'application/json' in request.headers.get('content-type', '') | |
def accepts_json(): | |
return url_format_extension() == 'json' or \ | |
'application/json' in request.headers.get('accept', '') | |
def render_json(thing, **json_options): | |
response.headers['Content-Type'] = 'application/json' | |
return json.dumps(thing, **json_options) | |
def formatted_url(*args, **params): | |
return url(*args, format=url_format_extension(), **params) | |
class BaseSchema(formencode.Schema): | |
""" | |
Base form schema | |
""" | |
allow_extra_fields = True | |
filter_extra_fields = True | |
pre_validators = [variabledecode.NestedVariables()] | |
class BaseController(WSGIController): | |
def __init__(self, *args, **kwargs): | |
super(BaseController, self).__init__(*args, **kwargs) | |
# This is set by the @validate decorator but not defined on the class | |
self.form_result = None | |
def __call__(self, environ, start_response): | |
"""Invoke the Controller""" | |
# WSGIController.__call__ dispatches to the Controller method | |
# the request is routed to. This routing information is | |
# available in environ['pylons.routes_dict'] | |
try: | |
return WSGIController.__call__(self, environ, start_response) | |
finally: | |
meta.Session.remove() | |
def _get_decoded(self, variable_decode=False, dict_char='.', | |
list_char='-', post_only=True): | |
# If they want post args only, use just the post args | |
if post_only: | |
if sent_json(): | |
params = json.loads(request.body) | |
else: | |
params = request.POST | |
else: | |
params = request.params | |
if hasattr(params, 'mixed'): | |
params = params.mixed() | |
if variable_decode: | |
log.debug("Running variable_decode on params") | |
return variabledecode.variable_decode(params, dict_char, | |
list_char) | |
else: | |
return params | |
def _convert(self, decoded, schema=None, validators=None, state=None, | |
variable_decode=False, dict_char='.', list_char='-'): | |
converted = {} | |
errors = {} | |
if schema: | |
log.debug("Validating against a schema") | |
try: | |
converted = schema.to_python(decoded, state) | |
except formencode.Invalid, e: | |
errors = e.unpack_errors(variable_decode, dict_char, list_char) | |
if validators: | |
log.debug("Validating against provided validators") | |
if isinstance(validators, dict): | |
for field, validator in validators.iteritems(): | |
try: | |
converted[field] = \ | |
validator.to_python(decoded.get(field), state) | |
except formencode.Invalid, error: | |
errors[field] = error | |
# remove cruft | |
clean_errors = dict( | |
(k, v) | |
for k, v in variabledecode.variable_encode(errors).items() | |
if v and not k.endswith('--repetitions')) | |
return converted, clean_errors | |
def _parse(self, request, schema=None, validators=None, state=None, | |
variable_decode=False, dict_char='.', list_char='-', | |
on_get=False, post_only=True): | |
if state is None: | |
state = PylonsFormEncodeState | |
# Skip the validation if on_get is False and its a GET | |
if not on_get and request.environ['REQUEST_METHOD'] == 'GET': | |
return {} | |
self.params = self._get_decoded( | |
variable_decode, dict_char, list_char, | |
post_only) | |
self.form_result, self.form_errors = self._convert( | |
self.params, schema, validators, state, | |
variable_decode, dict_char, list_char) | |
if self.form_errors: | |
raise formencode.Invalid( | |
format_compound_error(self.form_errors), | |
self.params, state, | |
error_dict=self.form_errors) | |
else: | |
return self.form_result | |
def _render_invalid(self, invalid=None, errors=None, params=None, | |
form=None, htmlfill_kwargs=None, | |
func=None, *args, **kwargs): | |
log.debug("Rendering errors found in validation") | |
if errors is None: | |
if invalid and hasattr(invalid, 'unpack_errors'): | |
errors = invalid.unpack_errors() | |
else: | |
errors = self.form_errors | |
if params is None: | |
params = self.params | |
if htmlfill_kwargs is None: | |
htmlfill_kwargs = {} | |
if accepts_json(): | |
return render_json(dict(errors=errors)) | |
request.environ['REQUEST_METHOD'] = 'GET' | |
# If there's no form supplied, just continue with the current | |
# function call. | |
if not form: | |
if func: | |
return func(self, *args, **kwargs) | |
else: | |
raise TypeError("Neither form nor func supplied") | |
request.environ['pylons.routes_dict']['action'] = form | |
response = self._dispatch_call() | |
# If the form_content is an exception response, return it | |
if hasattr(response, '_exception'): | |
return response | |
htmlfill_kwargs2 = htmlfill_kwargs.copy() | |
htmlfill_kwargs2.setdefault('encoding', request.charset) | |
return htmlfill.render(response, defaults=params, errors=errors, | |
**htmlfill_kwargs2) | |
def validate(schema=None, validators=None, form=None, variable_decode=False, | |
dict_char='.', list_char='-', post_only=True, state=None, | |
on_get=False, **htmlfill_kwargs): | |
"""The Pylons @validate decorator refactored, with most of the work done | |
by controller methods defined on BaseController. Enhanced to accept JSON. | |
Validate input either for a FormEncode schema, or individual | |
validators | |
Given a form schema or dict of validators, validate will attempt to | |
validate the schema or validator list. | |
If validation was successful, the valid result dict will be saved | |
as ``self.form_result``. Otherwise, the action will be re-run as if | |
it was a GET, and the output will be filled by FormEncode's | |
htmlfill to fill in the form field errors. | |
``schema`` | |
Refers to a FormEncode Schema object to use during validation. | |
``form`` | |
Method used to display the form, which will be used to get the | |
HTML representation of the form for error filling. | |
``variable_decode`` | |
Boolean to indicate whether FormEncode's variable decode | |
function should be run on the form input before validation. | |
``dict_char`` | |
Passed through to FormEncode. Toggles the form field naming | |
scheme used to determine what is used to represent a dict. This | |
option is only applicable when used with variable_decode=True. | |
``list_char`` | |
Passed through to FormEncode. Toggles the form field naming | |
scheme used to determine what is used to represent a list. This | |
option is only applicable when used with variable_decode=True. | |
``post_only`` | |
Boolean that indicates whether or not GET (query) variables | |
should be included during validation. | |
.. warning:: | |
``post_only`` applies to *where* the arguments to be | |
validated come from. It does *not* restrict the form to | |
only working with post, merely only checking POST vars. | |
``state`` | |
Passed through to FormEncode for use in validators that utilize | |
a state object. | |
``on_get`` | |
Whether to validate on GET requests. By default only POST | |
requests are validated. | |
Example:: | |
class SomeController(BaseController): | |
def create(self, id): | |
return render('/myform.mako') | |
@validate(schema=model.forms.myshema(), form='create') | |
def update(self, id): | |
# Do something with self.form_result | |
pass | |
""" | |
if state is None: | |
state = PylonsFormEncodeState | |
def wrapper(func, self, *args, **kwargs): | |
"""Decorator Wrapper function""" | |
request = self._py_object.request | |
try: | |
self._parse(request, schema, validators, state, | |
variable_decode, dict_char, list_char, on_get, post_only) | |
except formencode.Invalid, e: | |
return self._render_invalid(e, self.form_errors, self.params, | |
form, htmlfill_kwargs, | |
func, *args, **kwargs) | |
return func(self, *args, **kwargs) | |
return decorator(wrapper) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment