Created
February 12, 2010 14:43
-
-
Save asplake/302617 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
For Pylons 0.97, optionally with asplake's Routes fork with {.format} parameter support | |
1) fill_render(), a render() that encodes repeating groups properly | |
2) A refactored @validate with | |
a) JSON support | |
b) cleaned-up form_errors that render properly in the presence of repeating groups | |
c) some possibility of extensibility | |
3) JSON-related helpers: sent_json(), accepts_json(), render_json() | |
4) formatted_url(), a url() that remembers any format extension on the request | |
5) BaseSchema, a formencode.Schema with sensible defaults | |
Further to 2c, validation can be done without the @validator like so: | |
def update(self): | |
try: | |
self._parse(request, schema=MyForm()) | |
# Here self.form_result is populated as normal | |
# as is self.params, the decoded but unconverted form params or | |
# json request. You could do more stuff here that raises | |
# formencode.Invalid, validated model updates for example. | |
except Invalid as e: | |
return self._render_invalid(e, form='edit') | |
# Do more stuff with self.form_result, e.g. | |
if accepts_json(): | |
return render_json(json_serialisable_thing_made_from_form_result) | |
else: | |
return render('template') | |
See: | |
[1] @validate revisited, JSON support, content negotiation - http://groups.google.com/group/pylons-discuss/browse_thread/thread/927f4367d5367fc2/c0e325c804680bb6?lnk=gst&q=revisited#c0e325c804680bb6 | |
[2] asplake/routes - http://bitbucket.org/asplake/routes/ | |
[3] Experimental {.format} in Routes - http://positiveincline.com/?p=617 | |
""" | |
from decorator import decorator | |
import json | |
import logging | |
import formencode | |
from formencode import htmlfill | |
from formencode import variabledecode | |
from formencode.schema import format_compound_error | |
from pylons import request, response, session, tmpl_context as c, url | |
from pylons.controllers import WSGIController | |
from pylons.controllers.util import abort | |
from pylons.decorators import (PylonsFormEncodeState, | |
determine_response_charset, | |
encode_formencode_errors) | |
from pylons.templating import render_mako as render | |
from webob.multidict import UnicodeMultiDict | |
log = logging.getLogger(__name__) | |
def fill_render(template_name, values): | |
return htmlfill.render(render(template_name), | |
variabledecode.variable_encode(values)) | |
def url_format_extension(): | |
if 'wsgiorg.routing_args' in request.environ: | |
return request.environ['wsgiorg.routing_args'][1].get('format', None) | |
else: | |
return None | |
def sent_json(): | |
return url_format_extension() == 'json' or \ | |
'application/json' in request.headers.get('content-type', '') | |
def accepts_json(): | |
return url_format_extension() == 'json' or \ | |
'application/json' in request.headers.get('accept', '') | |
def render_json(thing, **json_options): | |
response.headers['Content-Type'] = 'application/json' | |
return json.dumps(thing, **json_options) | |
def formatted_url(*args, **params): | |
return url(*args, format=url_format_extension(), **params) | |
class BaseSchema(formencode.Schema): | |
""" | |
Base form schema | |
""" | |
allow_extra_fields = True | |
filter_extra_fields = True | |
pre_validators = [variabledecode.NestedVariables()] | |
class BaseController(WSGIController): | |
def __init__(self, *args, **kwargs): | |
super(BaseController, self).__init__(*args, **kwargs) | |
# This is set by the @validate decorator but not defined on the class | |
self.form_result = None | |
def __call__(self, environ, start_response): | |
"""Invoke the Controller""" | |
# WSGIController.__call__ dispatches to the Controller method | |
# the request is routed to. This routing information is | |
# available in environ['pylons.routes_dict'] | |
try: | |
return WSGIController.__call__(self, environ, start_response) | |
finally: | |
meta.Session.remove() | |
def _get_decoded(self, variable_decode=False, dict_char='.', | |
list_char='-', post_only=True): | |
# If they want post args only, use just the post args | |
if post_only: | |
if sent_json(): | |
params = json.loads(request.body) | |
else: | |
params = request.POST | |
else: | |
params = request.params | |
if hasattr(params, 'mixed'): | |
params = params.mixed() | |
if variable_decode: | |
log.debug("Running variable_decode on params") | |
return variabledecode.variable_decode(params, dict_char, | |
list_char) | |
else: | |
return params | |
def _convert(self, decoded, schema=None, validators=None, state=None, | |
variable_decode=False, dict_char='.', list_char='-'): | |
converted = {} | |
errors = {} | |
if schema: | |
log.debug("Validating against a schema") | |
try: | |
converted = schema.to_python(decoded, state) | |
except formencode.Invalid, e: | |
errors = e.unpack_errors(variable_decode, dict_char, list_char) | |
if validators: | |
log.debug("Validating against provided validators") | |
if isinstance(validators, dict): | |
for field, validator in validators.iteritems(): | |
try: | |
converted[field] = \ | |
validator.to_python(decoded.get(field), state) | |
except formencode.Invalid, error: | |
errors[field] = error | |
# remove cruft | |
clean_errors = dict( | |
(k, v) | |
for k, v in variabledecode.variable_encode(errors).items() | |
if v and not k.endswith('--repetitions')) | |
return converted, clean_errors | |
def _parse(self, request, schema=None, validators=None, state=None, | |
variable_decode=False, dict_char='.', list_char='-', | |
on_get=False, post_only=True): | |
if state is None: | |
state = PylonsFormEncodeState | |
# Skip the validation if on_get is False and its a GET | |
if not on_get and request.environ['REQUEST_METHOD'] == 'GET': | |
return {} | |
self.params = self._get_decoded( | |
variable_decode, dict_char, list_char, | |
post_only) | |
self.form_result, self.form_errors = self._convert( | |
self.params, schema, validators, state, | |
variable_decode, dict_char, list_char) | |
if self.form_errors: | |
raise formencode.Invalid( | |
format_compound_error(self.form_errors), | |
self.params, state, | |
error_dict=self.form_errors) | |
else: | |
return self.form_result | |
def _render_invalid(self, invalid=None, errors=None, params=None, | |
form=None, htmlfill_kwargs=None, | |
func=None, *args, **kwargs): | |
log.debug("Rendering errors found in validation") | |
if errors is None: | |
if invalid and hasattr(invalid, 'unpack_errors'): | |
errors = invalid.unpack_errors() | |
else: | |
errors = self.form_errors | |
if params is None: | |
params = self.params | |
if htmlfill_kwargs is None: | |
htmlfill_kwargs = {} | |
if accepts_json(): | |
return render_json(dict(errors=errors)) | |
request.environ['REQUEST_METHOD'] = 'GET' | |
# If there's no form supplied, just continue with the current | |
# function call. | |
if not form: | |
if func: | |
return func(self, *args, **kwargs) | |
else: | |
raise TypeError("Neither form nor func supplied") | |
request.environ['pylons.routes_dict']['action'] = form | |
response = self._dispatch_call() | |
# XXX: Legacy WSGIResponse support | |
legacy_response = False | |
if hasattr(response, 'content'): | |
form_content = ''.join(response.content) | |
legacy_response = True | |
else: | |
form_content = response | |
response = self._py_object.response | |
# If the form_content is an exception response, return it | |
if hasattr(form_content, '_exception'): | |
return form_content | |
# Ensure htmlfill can safely combine the form_content, params and | |
# errors variables (that they're all of the same string type) | |
if not isinstance(params, UnicodeMultiDict): | |
log.debug("Raw string form params: ensuring the '%s' form and " | |
"FormEncode errors are converted to raw strings for " | |
"htmlfill", form) | |
encoding = determine_response_charset(response) | |
# WSGIResponse's content may (unlikely) be unicode | |
if isinstance(form_content, unicode): | |
form_content = form_content.encode(encoding) | |
# FormEncode>=0.7 errors are unicode (due to being localized | |
# via ugettext). Convert any of the possible formencode | |
# unpack_errors formats to contain raw strings | |
errors = encode_formencode_errors(errors, encoding) | |
elif not isinstance(form_content, unicode): | |
log.debug("Unicode form params: ensuring the '%s' form is " | |
"converted to unicode for htmlfill", form) | |
encoding = determine_response_charset(response) | |
form_content = form_content.decode(encoding) | |
form_content = htmlfill.render(form_content, defaults=params, | |
errors=errors, **htmlfill_kwargs) | |
if legacy_response: | |
# Let the Controller merge the legacy response | |
response.content = form_content | |
return response | |
else: | |
return form_content | |
def validate(schema=None, validators=None, form=None, variable_decode=False, | |
dict_char='.', list_char='-', post_only=True, state=None, | |
on_get=False, **htmlfill_kwargs): | |
"""The Pylons @validate decorator refactored, with most of the work done | |
by controller methods defined on BaseController. Enhanced to accept JSON. | |
Validate input either for a FormEncode schema, or individual | |
validators | |
Given a form schema or dict of validators, validate will attempt to | |
validate the schema or validator list. | |
If validation was successful, the valid result dict will be saved | |
as ``self.form_result``. Otherwise, the action will be re-run as if | |
it was a GET, and the output will be filled by FormEncode's | |
htmlfill to fill in the form field errors. | |
``schema`` | |
Refers to a FormEncode Schema object to use during validation. | |
``form`` | |
Method used to display the form, which will be used to get the | |
HTML representation of the form for error filling. | |
``variable_decode`` | |
Boolean to indicate whether FormEncode's variable decode | |
function should be run on the form input before validation. | |
``dict_char`` | |
Passed through to FormEncode. Toggles the form field naming | |
scheme used to determine what is used to represent a dict. This | |
option is only applicable when used with variable_decode=True. | |
``list_char`` | |
Passed through to FormEncode. Toggles the form field naming | |
scheme used to determine what is used to represent a list. This | |
option is only applicable when used with variable_decode=True. | |
``post_only`` | |
Boolean that indicates whether or not GET (query) variables | |
should be included during validation. | |
.. warning:: | |
``post_only`` applies to *where* the arguments to be | |
validated come from. It does *not* restrict the form to | |
only working with post, merely only checking POST vars. | |
``state`` | |
Passed through to FormEncode for use in validators that utilize | |
a state object. | |
``on_get`` | |
Whether to validate on GET requests. By default only POST | |
requests are validated. | |
Example:: | |
class SomeController(BaseController): | |
def create(self, id): | |
return render('/myform.mako') | |
@validate(schema=model.forms.myshema(), form='create') | |
def update(self, id): | |
# Do something with self.form_result | |
pass | |
""" | |
if state is None: | |
state = PylonsFormEncodeState | |
def wrapper(func, self, *args, **kwargs): | |
"""Decorator Wrapper function""" | |
request = self._py_object.request | |
# Skip the validation if on_get is False and its a GET | |
if not on_get and request.environ['REQUEST_METHOD'] == 'GET': | |
return func(self, *args, **kwargs) | |
try: | |
self._parse(request, schema, validators, state, | |
variable_decode, dict_char, list_char, on_get, post_only) | |
except formencode.Invalid, e: | |
return self._render_invalid(e, self.form_errors, self.params, | |
form, htmlfill_kwargs, | |
func, *args, **kwargs) | |
return func(self, *args, **kwargs) | |
return decorator(wrapper) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment