Created
January 3, 2011 05:54
-
-
Save mcdonc/763162 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import inspect | |
import xmlrpclib | |
from zope.interface import implements | |
from zope.interface import providedBy | |
from pyramid.interfaces import IViewClassifier | |
from pyramid.interfaces import IView | |
from pyramid.interfaces import IViewMapperFactory | |
from pyramid.events import NewRequest | |
from pyramid.exceptions import NotFound | |
from pyramid.exceptions import Forbidden | |
from pyramid.traversal import traverse | |
from pyramid.security import has_permission | |
from pyramid.view import view_config | |
class MapplyViewMapper(object): | |
implements(IViewMapperFactory) | |
def __init__(self, **kw): | |
self.attr = kw.get('attr') | |
def __call__(self, view): | |
attr = self.attr | |
if inspect.isclass(view): | |
def _class_view(context, request): | |
params = getattr(request, 'xmlrpc_params', ()) | |
keywords = dict(request.params.items()) | |
if request.matchdict: | |
keywords.update(request.matchdict) | |
if attr is None: | |
inst = view(request) | |
response = self.mapply(inst, params, keywords) | |
else: | |
inst = view(request) | |
response = self.mapply(getattr(inst, attr), params, | |
keywords) | |
request.__view__ = inst | |
return response | |
mapped_view = _class_view | |
else: | |
def _nonclass_view(context, request): | |
params = (request,) + getattr(request, 'xmlrpc_params', ()) | |
keywords = dict(request.params.items()) | |
if request.matchdict: | |
keywords.update(request.matchdict) | |
if attr is None: | |
response = self.mapply(view, params, keywords) | |
else: | |
response = self.mapply(getattr(view, attr), params, | |
keywords) | |
return response | |
mapped_view = _nonclass_view | |
return mapped_view | |
def mapply(self, ob, positional, keyword): | |
f = ob | |
im = False | |
if hasattr(f, 'im_func'): | |
im = True | |
elif not hasattr(f, 'func_defaults'): | |
if hasattr(f, '__call__'): | |
f = f.__call__ | |
if hasattr(f, 'im_func'): | |
im = True | |
if im: | |
f = f.im_func | |
c = f.func_code | |
defaults = f.func_defaults | |
names = c.co_varnames[1:c.co_argcount] | |
else: | |
defaults = f.func_defaults | |
c = f.func_code | |
names = c.co_varnames[:c.co_argcount] | |
nargs = len(names) | |
args = [] | |
if positional: | |
positional = list(positional) | |
if len(positional) > nargs: | |
raise TypeError('too many arguments') | |
args = positional | |
get = keyword.get | |
nrequired = len(names) - (len(defaults or ())) | |
for index in range(len(args), len(names)): | |
name = names[index] | |
v = get(name, args) | |
if v is args: | |
if index < nrequired: | |
raise TypeError('argument %s was omitted' % name) | |
else: | |
v = defaults[index-nrequired] | |
args.append(v) | |
args = tuple(args) | |
return ob(*args) | |
def parse_xmlrpc_request(request): | |
""" Deserialize the body of a request from an XML-RPC request | |
document into a set of params and return a two-tuple. The first | |
element in the tuple is the method params as a sequence, the | |
second element in the tuple is the method name.""" | |
if request.content_length > (1 << 23): | |
# protect from DOS (> 8MB body), webob will only read CONTENT_LENGTH | |
# bytes when body is accessed, so no worries about getting a | |
# bogus CONTENT_LENGTH header | |
raise ValueError('Body too large (%s bytes)' % request.content_length) | |
params, method = xmlrpclib.loads(request.body, use_datetime=True) | |
return params, method | |
class xmlrpc_config(view_config): | |
""" This decorator acts almost exactly like | |
:class:`pyramid.view.view_config` but it produces a view configuration | |
which can call an XMLRPC callable rather than a standard Pyramid view | |
callable. | |
An XMLRPC callable is one which accepts a variable argument list, which | |
will be populated by items available from the XMLRPC params list, | |
``request.params`` and ``request.matchdict``. Its first argument must be | |
``request``, the other arguments will be populated from the available | |
parameters. | |
E.g.:: | |
from pyramid.xmlrpc import xmlrpc_config | |
@xmlrpc_config() | |
def say(request, what): | |
return {'say':what} | |
""" | |
def __init__(self, name='', request_type=None, for_=None, permission=None, | |
route_name=None, request_method=None, request_param=None, | |
containment=None, attr=None, wrapper=None, xhr=False, | |
accept=None, header=None, path_info=None, custom_predicates=(), | |
context=None, view_mapper=MapplyViewMapper, | |
renderer='xmlrpc'): | |
self.name = name | |
self.request_type = request_type | |
self.context = context or for_ | |
self.permission = permission | |
self.route_name = route_name | |
self.request_method = request_method | |
self.request_param = request_param | |
self.containment = containment | |
self.attr = attr | |
self.wrapper = wrapper | |
self.xhr = xhr | |
self.accept = accept | |
self.header = header | |
self.path_info = path_info | |
self.custom_predicates = custom_predicates | |
self.renderer = renderer | |
self.view_mapper = view_mapper | |
self.custom_predicates = tuple(custom_predicates) + (is_xmlrpc_request,) | |
def xmlrpc_renderer_factory(info): | |
def _render(value, system): | |
request = system.get('request') | |
if request is not None: | |
if not hasattr(request, 'response_content_type'): | |
request.response_content_type = 'text/xml' | |
if isinstance(value, xmlrpclib.Fault): | |
return xmlrpclib.dumps(value) | |
else: | |
return xmlrpclib.dumps((value,), methodresponse=True) | |
return _render | |
def xmlrpc_traversal_view(context, request): | |
# duplicate some logic from router to do Zope-style traversal and view | |
# lookup. | |
params, method = request.xmlrpc_params, request.xmlrpc_method | |
names = method.split('.') | |
info = traverse(context, '/'.join(names)) | |
inner_context = info['context'] | |
view_name = info['view_name'] | |
if view_name == '__call__': | |
view_name = '' | |
if view_name == '__repr__': | |
def view(context, request): | |
if has_permission(context, request, 'view'): | |
return repr(context) | |
raise Forbidden('No "view" permission on context') | |
else: | |
provides = [IViewClassifier] + map(providedBy, (request, inner_context)) | |
reg = request.registry | |
view = reg.adapters.lookup(provides, IView, view_name, default=None) | |
if getattr(view, '__original_view__', None) is xmlrpc_traversal_view: | |
view = None | |
if view is None: | |
raise NotFound(method) | |
request.__dict__.update(info) | |
return view(inner_context, request) | |
def is_xmlrpc_request(context, request): | |
return bool(getattr(request, 'is_xmlrpc', False)) | |
def _set_xmlrpc_params(event, override): | |
request = event.request | |
if (request.content_type == 'text/xml' | |
and request.method == 'POST' | |
and not 'soapaction' in request.headers | |
and not 'x-pyramid-avoid-xmlrpc' in request.headers): | |
params, method = parse_xmlrpc_request(request) | |
request.xmlrpc_params, request.xmlrpc_method = params, method | |
request.is_xmlrpc = True | |
if override: | |
request.override_renderer = 'xmlrpc' | |
return True | |
def set_xmlrpc_params_omnipresent(event): | |
return _set_xmlrpc_params(event, override=True) | |
def set_xmlrpc_params(event): | |
return _set_xmlrpc_params(event, override=False) | |
def limited(config): | |
config.add_renderer('xmlrpc', xmlrpc_renderer_factory) | |
config.add_subscriber(set_xmlrpc_params, NewRequest) | |
def omnipresent(config): | |
config.add_renderer('xmlrpc', xmlrpc_renderer_factory) | |
config.add_subscriber(set_xmlrpc_params_omnipresent, NewRequest) | |
config.add_view( | |
xmlrpc_traversal_view, | |
renderer='xmlrpc', | |
custom_predicates=(is_xmlrpc_request,) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment