Skip to content

Instantly share code, notes, and snippets.

@amcgregor
Last active April 29, 2021 15:45
Show Gist options
  • Save amcgregor/a1496ee5c336b8bbffec86fe97993756 to your computer and use it in GitHub Desktop.
Save amcgregor/a1496ee5c336b8bbffec86fe97993756 to your computer and use it in GitHub Desktop.
Privacy-minded event data storage model with generic "actor" reference and string-based tag clustering.
"""Event storage.
These store "events", associated with "actors", which may be classified into one of several types, and be associated
with arbitrary tag-based metadata. Individual types of event may preserve additional data or metadata.
References:
* [Events as a Storage Mechanism](https://cqrs.wordpress.com/documents/events-as-storage-mechanism/) [PDF](https://dl.dropboxusercontent.com/u/9162958/CQRS/Events%20as%20a%20Storage%20Mechanism%20CQRS.pdf)
* [Building an Event Storage](https://cqrs.wordpress.com/documents/building-event-storage/) [PDF](https://dl.dropboxusercontent.com/u/9162958/CQRS/Building%20an%20Event%20Storage%20CQRS.pdf)
--
https://pypi.org/project/event-store/
https://bitbucket.org/drozdyuk/event-store
"""
import sys
import traceback
from abc import abstractmethod
from datetime import timedelta
from getpass import getuser
from socket import gethostname
from typing import ClassVar, Final, Optional, Type, TypeVar, Tuple
from bson import DBRef
from marrow.mongo import Document, Field, Index # Fundamental types.
from marrow.mongo.field import \
Array, \
Boolean, \
Date, \
Embed, \
Integer, \
ObjectId, \
Float, \
Reference, \
Set, \
String, \
TTL, \
Mapping
from marrow.mongo.trait import Derived, Identified, Queryable
from marrow.schema.validate.network import IPAddress
from web.core.context import Context
__all__ = ['Event', 'UserEvent', 'RequestEvent', 'ErrorEvent']
A = TypeVar('A', bound='Actor')
AnyActor = Type[A]
E = TypeVar('E', bound='Event')
AnyEvent = Type[E]
# Utility Decorators
class classproperty:
"""Implement a form of on-access property getter at the class level."""
def __init__(self, callable):
self.fn = classmethod(callable)
def __get__(self, inst, cls=None):
return self.fn.__get__(inst, cls)()
def populate(*a, **kw):
"""Pre-populate instances of an object on that object.
Any positionally passed argument is assumed to be a string, with the result of instantiation using it assigned
to an attribute by that name. Keyword arguments can be used to specify an attribute (key) constructed from the
value, if needed.
"""
def inner(C):
for v in a:
kw[v] = v
for k, v in kw.items():
setattr(C, k, C(v))
return C
return inner
class Actor(Identified):
"""An abstract actor to associate with events.
An application may be composed of components which may operate independent of user interaction. Tracking which
subsystem is involved in certain events may be highly valuable, especially in forming event aggregates.
This class is not ever meant to be persisted within MongoDB; it represents an object that is just document-alike
enough to permit use with Reference fields. The DBRef or ID reference generated will not actually exist. Because
de-serialization can not succeed, instances may be directly compared against a DBRef.
"""
__collection__ = '_a' # Abstract actors should use underscore-prefixed collection names which do not exist.
id = String('_id')
def __eq__(self, other):
"""Permit comparison between non-persisted Actors and the DBRef instances produced utilizing them."""
if isinstance(other, Actor):
if other.__collection__ != self.__collection__:
return False
other = other.id
elif isinstance(other, DBRef):
if other.collection != self.__collection__:
return False
other = other.id
else:
raise TypeError(f"Unable to compare a {self.__class__.__name__} against a {other.__class__.__name__}.")
return str(other) == self.id
class Automaton(Actor):
"""Identifiable automation within/of the application.
Examples include, but are not limited to:
* `Automaton('Google')` — An API provided to a dedicated third-party.
* `Automaton('cron')` — An event as a result of a periodic scheduled task.
* `Automaton('migration')` — The result of some form of start-up "migration" task.
"""
__collection__ = '_A'
class LocalUser(Actor):
"""A slightly less abstract actor representing an interactive user at a desktop session."""
__collection__ = '_u'
@classmethod
def whoami(cls: AnyActor) -> A:
"""Produce a LocalUser instance representing the current interactive user on the current workstation."""
return cls(f'{getuser()}@{gethostname()}')
class Event(Identified, Derived, Queryable):
"""The base definition for all events.
This defines basic database metadata and describes the minimum data model in common across all event types. All
recorded events contain:
* An ObjectID identifier for the event which includes a second-accurate timestamp usable as the "version". It is
also identifiable to a machine and process on that machine, including a sequence number for that process.
* An event class identifier, used when de-serializing to ensure the correct Python representation is used.
* A redaction time a defined period in the future after creation time, at which point the record will be deleted.
* An array of "actors" associated with the event; these may be entities such as users, or abstract Actors.
* An array of "tags" used for filtering and grouping events, to form "aggregates".
Both the actors and tags are indexed to permit efficient querying, e.g. "all events for user X".
The `Event` base class itself is abstract and essentially meaningless; subclasses should be used to represent
events for actual storage. Additional "generic actor constants" may be defined by the application. The two pre-
defined constants are:
`USER` — "Local" access attributable to an interactive user, such as via a REPL shell.
`SYSTEM` — The application itself performed this action, e.g. fixture unpacking, migrations, maintenance...
"""
# Generic actor constants.
USER = LocalUser.whoami() # The user the process is running as and host name it is running on.
SYSTEM = Actor('system') # The application itself performed this action. The event _id will be more specific.
# Database Metadata
__database__: ClassVar[str] = 'default' # Database to utilize for event storage.
__collection__: ClassVar[str] = 'events' # Database collection to store tracked events within.
__timeline__: ClassVar[Optional[str]] = None # Which timeline, if any, should this event appear within?
# Field Definitions
redact = TTL(default=timedelta(days=365), assign=True, write=False)
actors = Set(kind=Reference(concrete=True), default=set)
tags = Set(kind=Field(), default=set)
# Index Definitions
actor = Index('actors')
tag = Index('tags')
@classmethod
@abstractmethod
def new(cls:AnyEvent, context:Context) -> E:
"""Construct a new event instance given a WebCore request context."""
...
class UserEvent(Event):
"""An event intended to be seen by end users.
Other projects subclass this to provide their own events, such as OrderEvent. These events may be associated with
an action of some kind. Code run when the user "activates" or performs the "default action" associated with the
event, similar to desktop notifications.
"""
__timeline__ = 'user' # Event subclasses intended for users should appear on a user's dashboard and notifications.
sticky = Boolean(default=False, assign=True) # Display as flash / in list even if seen.
seen = Date(default=None, assign=True) # When was the notification (but not detail) seen?
read = Date(default=None, assign=True) # When was the detail seen?
acted = Date(default=None, assign=True) # When did the user perform the event's action? (If any.)
@classmethod
def new(cls:AnyEvent, context:Context) -> E:
inst = cls()
return inst
@classproperty
def unseen(cls:AnyEvent):
return self.find(seen=None)
@classproperty
def unread(cls:AnyEvent):
return self.find(read=None)
@abstractmethod
def __call__(self, context:Context):
"""Perform some work as a result of the event being "acted upon"."""
...
class SecurityEvent(UserEvent):
"""An event relating to user account security in some way.
The specific type of event is stored as `kind`, predefined as the set of:
* `create` — An account was created. An end-user "sign up" process.
* `authenticate` — User authentication was attempted.
* `forgot` — User attempted to initiate password reset.
* `reset` — The user's password has been changed due to reset.
* `change` — The user has changed their password normally.
* `update` — The user has altered their account's identifier, e.g. e-mail address.
"""
__timeline__ = 'security' # Users don't need to be pestered about these, but should still be able to examine.
# XXX: Make the choices pull from a plugin registry of action handlers?
kind = String(choices={'create', 'authenticate', 'forgot', 'reset', 'change', 'update'}, required=True)
origin = String(validator=IPAddress)
success = Boolean(default=False, assign=True)
@classmethod
def new(cls:AnyEvent, context:Context, identity) -> E:
inst = super().new(context)
return inst
def __call__(self, context:Context):
"""Perform an action appropriate for the specific kind of security event."""
# XXX: Similar to above, use a registry here?
...
class RequestEvent(Event):
"""Raw data of all requests and responses.
Culled on a shorter time-frame than normal data for privacy (and storage size) reasons.
"""
__timeline__ = 'audit' # Potentially sensitive events should only be visible to administrators performing an audit.
WSGI_EXCLUDE = { # Not every WSGI environment variable can be persisted.
'wsgi.input', 'wsgi.errors', 'wsgi.file_wrapper', # File-like objects included in the WSGI environment.
'wsgi.input_terminated', # The body will always have been read.
'beaker.get_session', 'beaker.cache', 'beaker.session', # Functional accessors.
'paste.registry', 'webob.adhoc_attrs', 'webob._parsed_query_vars', # Superglobal, attribute support, cache.
'paste.throw_errors', 'paste.evalexception.debug_count', 'weberror.evalexception', # Debugging.
'web.translator', 'web.controller', 'web.app', # Framework components.
}
HEADER_EXCLUDE = { # HTTP headers to exclude from capture.
'Connection', # To keep-alive or not is not overly relevant.
'Cookie', # Cookies will contain sensitive (identifying) information.
'X-Generation-Time', # Typical performance headers.
'Generation-Time',
}
redact = Event.redact.adapt(default=timedelta(days=30)) # These are identifiable to a specific user.
status = Integer() # HTTP response code.
environ = Embed() # WSGI environment, including request headers.
endpoint = PluginReference(explicit=True) # Dot-colon path to the final endpoint.
path = Path() # The requested URI path resolving to this endpoint.
duration = Float() # In milliseconds.
request = String() # Request body.
headers = Embed() # Request headers.
response = String() # Response body.
session = Embed() # Complete session contents.
@classmethod
def new(cls:AnyEvent, context:Contect) -> E:
suitable = lambda k: k not in inst.WSGI_EXCLUDE and k[0] != '_' and k != k.upper()
endpoint = context.path.current
# Calculate the request duration if the analytics extension is enabled.
if 'milestone' not in context or 'done-' not in context.milestone: duration = None
else: duration = context.milestone['done-'] - m['init']
inst = cls(
status = context.response.status_int,
environ = {k: v for k, v in context.environ.items() if suitable(k)},
endpoint = endpoint.handler,
path = endpoint.path,
duration = duration,
request = context.request.body,
headers = {k: v for k, v in context.request.headers.items() if k not in inst.HEADER_EXCLUDE},
response = context.response.body,
session = {}, # TODO
)
return inst
class ErrorEvent(Event):
"""Capture of exception-related data.
The intent is to capture:
* The exception class (search) and arguments (reconstruction).
* The code location details such as module, line, function, etc.
* Traceback information, including variables passed as arguments at each call site a la pytest assert.
"""
__timeline__ = 'audit' # As per general request events, errors may be sensitive and are limited to administrators.
class ErrorFrame(Document):
filename = Path() # file...
line = Integer() # line number...
name = String() # in...
local = Mapping() # Optional frame-local variable scope.
redact = Event.redact.adapt(default=timedelta(days=30)) # These are identifiable to a specific user.
exception = PluginReference(explicit=True) # Dot-colon reference to the specific exception class.
arguments = Array() # Arguments to the exception.
message = String() # The string representation of the exception.
trace = Array(kind=Embed(kind=ErrorFrame)) # The stack trace.
@classmethod
def new(cls:AnyEvent, context:Context, e:Optional=None, *, trace:bool=True, capture:bool=False) -> E:
"""Construct a new ErrorEvent representation from the currently active or a specific exception instance.
If `trace` is truthy, the traceback frame summary will be persisted with the error event. Additionally, if the
`capture` argument is truthy, the contents of the individual stack frame local scopes will also be serialized.
These two options are only effective if capturing a live exception.
"""
# Grab the current exception, or use the one provided.
if e is None: t, e, tb = sys.exc_info()
elif isinstance(e, tuple) and len(e) == 3: t, e, tb = e
elif isinstance(e, BaseException):
tb = getattr(e, '__traceback__', None)
if tb:
t = type(e)
else:
t, e_, tb = sys.exc_info()
if e_ is not e:
tb = None
else: t = type(e); tb = None
if e is None:
raise TypeError("{} must be constructed within the context of an active exception, or be passed one.")
# Instantiate, and if not capturing, we're done here.
inst = cls(exception=t, arguments=e.args, message=str(e))
if tb is None or not trace: return inst
# traceback.FrameSummary(filename, lineno, name, lookup_line=True, locals=None, line=None)
# Prepare a complete traceback.
F = cls.ErrorFrame; inst.trace = []
for fs in traceback.extract_tb(tb):
frame = cls.ErrorFrame(filename=fs.filename, line=fs.lineno, name=fs.name, local={} if capture else None)
if capture:
fl = extract_stack(fs)
return inst
@property
def exc(self):
return self.exception(*self.arguments)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment