This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from bloop.models import unpack_from_dynamodb | |
from bloop.signals import object_loaded | |
from bloop.stream.coordinator import Coordinator | |
class TableStream: | |
def __init__(self, *, stream_arn, engine, model_selector): | |
""" | |
model_selector is a function that takes a record | |
and returns a model class. for example: |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Support evaluation of a very limited subset of operations and literals. | |
EvalContext supports a subset of operations common across many languages, | |
while PythonEvalContext adds on Sets, Tuples, Slices, and the literals | |
True, False, and None. | |
.. code-block:: pycon | |
>>> from eval_gist import EvalContext |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pendulum | |
from cryptography.hazmat.backends import default_backend | |
from cryptography.hazmat.primitives import hashes, serialization | |
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey | |
from bloop import UUID, Binary, Column | |
from bloop.ext.pendulum import DateTime | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def get_marked(obj): | |
"""Returns the set of marked columns for an object""" | |
return set(_obj_tracking[obj]["marked"]) | |
def get_marked_or_default(obj): | |
marked = get_marked(obj) | |
for column in obj.Meta.columns: | |
if column.default is not missing: | |
marked.add(column) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
_trace_installed = False | |
def install_trace_logger(): | |
global _trace_installed | |
if _trace_installed: | |
return | |
level = logging.TRACE = logging.DEBUG - 5 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import functools | |
import json | |
import secrets | |
import aiohttp | |
from concurrent.futures import ALL_COMPLETED | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import bottom | |
from collections import defaultdict | |
_handlers = defaultdict(list) | |
def attach_handlers(client: bottom.Client) -> bottom.Client: | |
for event, funcs in _handlers.items(): | |
for func_builder in funcs: | |
client.on(event, func=func_builder(client)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Any, Callable, Iterable | |
def noop(next_handler: Callable, *args: Any, **kwargs: Any) -> None: | |
pass | |
def process(handlers: Iterable[Callable], *args: Any, **kwargs: Any) -> None: | |
next_args = args | |
next_kwargs = kwargs |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from bloop import Engine | |
from bloop.ext._lambda import unpack | |
from boto3 import Session | |
from my_models_script import MyModel | |
primary = Engine() | |
session = Session(region_name="us-east-1") | |
backup = Engine( | |
dynamodb=session.client("dynamodb"), |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Any, Dict, List, NamedTuple, Optional, Tuple, TypeVar, Generic, Type | |
T = TypeVar("T") | |
class Field(Generic[T]): | |
key: str | |
t: Type[T] | |
readonly: bool | |
def __init__(self, t: Type[T], readonly: bool=True) -> None: |