Last active
August 29, 2015 14:09
-
-
Save mmulich/d6350493e63c1a91b67e to your computer and use it in GitHub Desktop.
Non-conflicting pyramid applications using wsgi_intercept
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyramid.config import ( | |
global_registries, | |
ApplicationCreated, | |
Configurator as BaseConfigurator, | |
) | |
from pyramid.threadlocal import ( | |
defaults as threadlocal_defaults, | |
manager as actual_threadlocal_manager, | |
ThreadLocalManager, | |
) | |
from pyramid.router import Router as BaseRouter | |
other_threadlocal_manager = ThreadLocalManager(default=threadlocal_defaults) | |
# A modified router to use a separate threadlocal_manager | |
class Router(BaseRouter): | |
"""Custom router""" | |
other_threadlocal_manager = other_threadlocal_manager | |
# A modified configurator to return a custom router | |
class Configurator(BaseConfigurator): | |
other_threadlocal_manager = other_threadlocal_manager | |
def make_wsgi_app(self): | |
"""This is mostly a straight copy of the original.""" | |
self.commit() | |
app = Router(self.registry) | |
# Allow tools like "pshell development.ini" to find the 'last' | |
# registry configured. | |
global_registries.add(self.registry) | |
# Push the registry onto the stack in case any code that depends on | |
# the registry threadlocal APIs used in listeners subscribed to the | |
# IApplicationCreated event. | |
self.manager.push({'registry':self.registry, 'request':None}) | |
try: | |
self.registry.notify(ApplicationCreated(app)) | |
finally: | |
self.manager.pop() | |
return app | |
@property | |
def _threadlocal_manager(obj): | |
if isinstance(obj, (Router, Configurator,)): | |
return obj.other_threadlocal_manager | |
elif isinstance(obj, (BaseRouter, BaseConfigurator)): | |
return actual_threadlocal_manager | |
else: | |
raise Exception("Ouch!") | |
def monkey_patch_pyramid(): | |
BaseConfigurator.manager = _threadlocal_manager | |
BaseRouter.threadlocal_manger = _threadlocal_manager | |
################## | |
from wsgiref.simple_server import make_server | |
import requests | |
from pyramid.response import Response | |
from zope.interface import implementer, Interface | |
class INamer(Interface): | |
"""a naming thing""" | |
@implementer(INamer) | |
class Namer(object): | |
def __init__(self, name): | |
self.name = name | |
def __call__(self): | |
return self.name | |
def publishing_view(request): | |
name = request.registry.getUtility(INamer)() | |
body = '{"name": "%s"}' % (name) | |
return Response( | |
content_type="application/json", | |
body=body | |
) | |
def publishing(): | |
config = Configurator() | |
config.add_route('view', '/') | |
config.add_view(publishing_view, route_name='view') | |
config.registry.registerUtility(Namer('fred')) | |
app = config.make_wsgi_app() | |
return app | |
##################### | |
def authoring_view(request): | |
name = request.registry.getUtility(INamer)() | |
yours = requests.get('http://localhost:9000/').json()['name'] | |
body = '{"name": "%s", "yours": "%s"}' % (name, yours) | |
return Response( | |
content_type="application/json", | |
body=body | |
) | |
def authoring(): | |
config = BaseConfigurator() | |
config.add_route('view', '/') | |
config.add_view(authoring_view, route_name='view') | |
config.registry.registerUtility(Namer('smoo')) | |
app = config.make_wsgi_app() | |
return app | |
###################### | |
if __name__ == '__main__': | |
app = publishing() | |
server = make_server('0.0.0.0', 9000, app) | |
server.serve_forever() | |
###################### | |
import unittest | |
import webtest | |
from wsgi_intercept import requests_intercept, add_wsgi_intercept | |
class TestCase(unittest.TestCase): | |
def test(self): | |
monkey_patch_pyramid() | |
# The remote application we want to test interactions with. | |
requests_intercept.install() | |
add_wsgi_intercept('localhost', 9000, publishing) | |
resp = requests.get('http://localhost:9000/') | |
self.assertEqual(resp.json(), {'name': 'fred'}) | |
# The local application | |
app = webtest.TestApp(authoring()) | |
resp = app.get('/') | |
self.assertEqual(resp.json, {'name': 'smoo', 'yours': 'fred'}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment