This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from apscheduler.executors.pool import ProcessPoolExecutor, ThreadPoolExecutor | |
from apscheduler.jobstores.redis import RedisJobStore | |
from apscheduler.schedulers.background import BlockingScheduler | |
from apscheduler.triggers.cron import CronTrigger | |
from func import heart_beat | |
if __name__ == "__main__": | |
executors = { | |
'default': ThreadPoolExecutor(max_workers=10), | |
'processpool': ProcessPoolExecutor(max_workers=5) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import time | |
import trio | |
import anyio | |
async def acquire_release(lock): | |
async with lock: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import atexit | |
from contextlib import ExitStack | |
from anyio.from_thread import start_blocking_portal | |
exit_stack = ExitStack() | |
portal = exit_stack.enter_context(start_blocking_portal()) | |
atexit.register(exit_stack.close) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hashlib | |
import time | |
import anyio | |
from anyio import create_tcp_listener | |
from anyio.abc import SocketAttribute, SocketStream | |
async def handle_connection(stream: SocketStream): | |
with stream: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pathlib import Path | |
path = Path("MARKER-ö.txt") | |
path.touch() | |
path2 = next(p for p in Path.cwd().iterdir() if p.name.startswith("MARKER-")) | |
path2.chmod(0o777) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
__all__ = ["connect", "Connection", "Cursor"] | |
import sqlite3 | |
from collections.abc import Callable, Sequence | |
from functools import partial, wraps | |
from typing import Any | |
from anyio import CapacityLimiter, to_thread |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from asyncio import run, sleep | |
from asyncpg import InterfaceError | |
from sqlalchemy.ext.asyncio import create_async_engine | |
async def main(): | |
engine = create_async_engine("postgresql+asyncpg://postgres:secret@localhost/testdb") | |
while True: | |
while True: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class _BlockingAsyncContextManager(AbstractContextManager): | |
_enter_future: Future | |
_exit_future: Future | |
_exit_event: Event | |
_exit_exc_info: Tuple[Optional[Type[BaseException]], Optional[BaseException], | |
Optional[TracebackType]] | |
def __init__(self, async_cm: AsyncContextManager[T_co], portal: 'BlockingPortal'): | |
self._async_cm = async_cm | |
self._portal = portal |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from concurrent.futures import as_completed | |
from anyio import start_blocking_portal, sleep | |
async def long_running_task(index): | |
await sleep(1) | |
print(f'Task {index} running...') | |
await sleep(index) | |
return f'Task {index} return value' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
test.py:13: note: Revealed type is 'Any' | |
test.py:13: note: 'reveal_type' always outputs 'Any' in unchecked functions | |
test.py:15: note: Revealed type is 'Any' | |
test.py:15: note: 'reveal_type' always outputs 'Any' in unchecked functions | |
test.py:17: note: Revealed type is 'Any' | |
test.py:17: note: 'reveal_type' always outputs 'Any' in unchecked functions | |
test.py:20: error: No overload variant of "foo" matches argument type "float" | |
test.py:20: note: Possible overload variants: | |
test.py:20: note: def foo(bar: Union[bytes, str]) -> str | |
test.py:20: note: def foo(bar: int) -> int |
NewerOlder