This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import wraps | |
class A: | |
@classmethod | |
def method(cls): | |
print('cls = %s' % cls.__name__) | |
class B(A): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import trio | |
async def error_after(delay): | |
await trio.sleep(delay) | |
raise Exception('foo') | |
async def somegenerator(): | |
try: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ast | |
import re | |
import sys | |
from importlib.machinery import SourceFileLoader | |
from importlib.abc import MetaPathFinder | |
from importlib.util import decode_source, cache_from_source | |
from typing import Iterable | |
from unittest.mock import patch | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import partial | |
from inspect import iscoroutinefunction | |
import pytest | |
import hyperio | |
try: | |
from async_generator import isasyncgenfunction | |
except ImportError: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@pytest.fixture(scope='session', autouse=True) | |
def sqlalchemy_thread_check(connection): | |
"""Raise an exception is SQL is executed in the event loop thread in the application code.""" | |
def check_thread(*args): | |
if asyncio._get_running_loop(): | |
frames = [f for f in extract_stack()[:-1] if 'ebserver' in f.filename] | |
if frames: | |
raise RuntimeError(f'SQL executed in the event loop thread ' | |
f'(from {frames[-1].filename}:{frames[-1].lineno})') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from threading import Thread | |
counter = 0 | |
threads = [] | |
def increment(): | |
global counter | |
for _ in range(500000): | |
counter += 1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gc | |
import inspect | |
import threading | |
import trio | |
from trio import run_sync_in_worker_thread | |
from trio.hazmat import ( | |
spawn_system_task, wait_task_rescheduled, reschedule, current_task, Error, Value, Abort) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import inspect | |
from asyncio import get_event_loop, gather, Task, wait | |
from concurrent.futures import Executor, ThreadPoolExecutor, ProcessPoolExecutor | |
from functools import wraps, partial | |
from typing import Callable, Union, TypeVar, Awaitable, List, Dict | |
T_Retval = TypeVar('T_Retval') | |
class MultiError(Exception): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import trio | |
from wampproto.impl.client.trio import TrioWAMPClient | |
def procedure_handler(name): | |
return 'Hello, {}!'.format(name) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[metadata] | |
name = wampproto | |
description = WAMP (Web Application Message Protocol) state-machine based protocol implementation | |
long_description = file: README.rst | |
author = Alex Grönholm | |
author_email = [email protected] | |
url = https://github.com/agronholm/wampproto | |
license = MIT | |
license_file = LICENSE | |
keywords = wamp, websockets |