Last active
February 5, 2020 19:40
-
-
Save tdamsma/d514cf963cde37a234c4d85f6548aa84 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Put up a simple webserver for demonstration and testing purposes.""" | |
from functools import partial | |
from wsgiref.simple_server import make_server | |
import zope.sqlalchemy | |
from time import time_ns | |
from pyramid.authentication import AuthTktAuthenticationPolicy | |
from pyramid.authorization import ACLAuthorizationPolicy | |
from pyramid.config import Configurator | |
from pyramid.events import BeforeTraversal | |
from pyramid.response import Response | |
import transaction | |
from pyramid.security import ( | |
ALL_PERMISSIONS, | |
NO_PERMISSION_REQUIRED, | |
Allow, | |
Authenticated, | |
) | |
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, create_engine, func | |
from sqlalchemy.ext.declarative import declarative_base | |
from sqlalchemy.orm import object_session, relationship, sessionmaker, backref | |
from sqlalchemy.schema import MetaData | |
from sqlalchemy.types import Integer, Text | |
metadata = MetaData() | |
Base = declarative_base(metadata=metadata) | |
class User(Base): | |
__tablename__ = "user" | |
id = Column(Integer, primary_key=True,) | |
name = Column(Text) | |
class LoggedRequest(Base): | |
"""Store every request.""" | |
__tablename__ = "logged_request" | |
id = Column(Integer, primary_key=True,) | |
user_id = Column(Integer, ForeignKey(User.id),) | |
record_created = Column( | |
DateTime(timezone=True), server_default=func.current_timestamp(), | |
) | |
duration_ms = Column(Integer) | |
completed = Column(Boolean) | |
user = relationship( | |
"User", | |
backref=backref("requests", lazy=True), | |
primaryjoin="User.id==foreign(LoggedRequest.user_id)", | |
) | |
def __init__(self, request): | |
self.completed = False | |
def __repr__(self): | |
return f"[LoggedRequest {self.completed=} {self.duration_ms=}]" | |
def hello_world(request): | |
return Response(f"{request.user.requests=}") | |
def get_tm_session(session_factory, transaction_manager): | |
session = session_factory() | |
zope.sqlalchemy.register(session, transaction_manager=transaction_manager) | |
return session | |
class MyAuthenticationPolicy(AuthTktAuthenticationPolicy): | |
def authenticated_userid(self, request): | |
user = request.user | |
if user is not None: | |
return str(user.id) | |
def unauthenticated_userid(self, request): | |
return 3 | |
def get_user(request): | |
userid = request.unauthenticated_userid | |
if userid is not None: | |
return request.session.query(User).filter_by(id=userid).one() | |
def request_logger(event): | |
request = event.request | |
logged_request_t0 = time_ns() | |
logged_request = LoggedRequest(request) | |
logged_request.user_id = request.unauthenticated_userid | |
session = request.registry["session_factory"](expire_on_commit=False) | |
session.add(logged_request) | |
session.commit() | |
request.add_finished_callback( | |
partial( | |
request_finished_callback, | |
session=session, | |
logged_request=logged_request, | |
logged_request_t0=logged_request_t0, | |
) | |
) | |
request.add_response_callback( | |
partial(response_callback, session=session, logged_request=logged_request) | |
) | |
def request_finished_callback( | |
request, session=None, logged_request=None, logged_request_t0=None | |
): | |
logged_request.duration_ms = (time_ns() - logged_request_t0) / 1_000_000 | |
session.commit() | |
session.close() | |
def response_callback(request, response, session=None, logged_request=None): | |
logged_request.completed = True | |
session.commit() | |
if __name__ == "__main__": | |
with Configurator( | |
settings={ | |
"tm.annotate_user": True, | |
"debugtoolbar.hosts": "127.0.0.1/32", | |
"debugtoolbar.active_panels": "performance", | |
} | |
) as config: | |
config.add_route("hello", "/") | |
config.add_view(hello_world, route_name="hello") | |
config.include("pyramid_tm") | |
config.include("pyramid_debugtoolbar") | |
config.set_authentication_policy(MyAuthenticationPolicy(secret="1")) | |
config.set_authorization_policy(ACLAuthorizationPolicy()) | |
engine = create_engine("sqlite:///./testing.sqlite", echo=True) | |
session_factory = sessionmaker() | |
session_factory.configure(bind=engine) | |
config.registry["session_factory"] = session_factory | |
config.add_request_method( | |
lambda r: get_tm_session(session_factory, r.tm), "session", reify=True, | |
) | |
config.add_request_method(get_user, "user", reify=True) | |
config.add_subscriber(request_logger, BeforeTraversal) | |
session = session_factory() | |
Base.metadata.drop_all(session.bind) | |
Base.metadata.create_all(session.bind) | |
for i in range(4): | |
session.add(User(name=f"user {i}")) | |
session.commit() | |
session.close() | |
app = config.make_wsgi_app() | |
server = make_server("127.0.0.1", 6543, app) | |
server.serve_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment