Skip to content

Instantly share code, notes, and snippets.

View goodboy's full-sized avatar
🥱
seriously, don't be serious..

goodboy

🥱
seriously, don't be serious..
View GitHub Profile
@basak
basak / aioevent.md
Created April 6, 2021 02:27
Trio broadcast implementation with "slowest consumer" backpressure

I've been using this code "in production" for a while now. It dates back to https://groups.google.com/g/python-tulip/c/J7tCcBU5TPA/m/NM7iBhhhEAAJ except that I converted it to Trio a while ago. It is intended to be lossless - if desired, you can ensure to catch all messages since you start to listen, without losing anything. Producers are blocked on send() until the slowest consumer has received the message.

Since new consumers won't receive messages from before they began to listen, the point at which a consumer "begins listening" is important. This happens when the async iterator is created - ie. when the for loop runs the implicit aiter(). If you do this as the first thing in a coroutine, you might expect all message following a nursery.start_soon() call starting that coroutine to be picked up. But in practice, the for loop won't run the implicit aiter() until some time later, and so you won't see messages sent prior to that point. To avoid this, you must all aiter() yourself and pass that in, o

@richardsheridan
richardsheridan / map_concurrently_in_subthread_trio.py
Last active January 2, 2023 22:14
map_concurrently_in_subthread_trio
import queue
import random
from functools import partial
from time import sleep, perf_counter
import trio
CONCURRENCY_LIMIT = 8
limiter = trio.CapacityLimiter(CONCURRENCY_LIMIT)
@iamzoltan
iamzoltan / pussthescrapper.py
Last active July 15, 2021 21:38
basic scrapping
import httpx
import feedparser
from bs4 import BeautifulSoup
from sqlalchemy.exc import IntegrityError
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, DateTime
# create db engine
engine = create_engine('sqlite:///crypto_articles.db', echo = True)
meta = MetaData()
import trio
import functools
class ReplicatedBroadcastFailed(Exception):
pass
class ReplicatedBroadcast:
def __init__(self, nursery, async_iterable):
self._nursery = nursery
@richardsheridan
richardsheridan / universal_queue.py
Last active November 7, 2022 04:09 — forked from njsmith/universal-trio-queue.py
Universal cross-thread unbuffered queue for trio, asyncio, and threads
# Rough draft of a Queue object that can be used simultaneously from
# sync threads + *multiple* trio and asyncio threads, all at once.
#
# If you don't have multiple threads each doing their own separate calls to Xio.run,
# then don't use this; there are simpler solutions. This was mostly an exercise to
# figure out if and how this could be done.
#
# Currently, the test will provide 94% coverage given sufficient timeout. The
# remaining are (apparently rare) races and the durable aio shielding.
import random
from __future__ import annotations
import cProfile
import pstats
from collections import deque
from dataclasses import dataclass, field
from random import Random
from timeit import default_timer
from typing import Awaitable, Callable, Deque, Iterable, List, Optional, Protocol, Union
from __future__ import annotations
import dataclasses
import math
from collections.abc import Callable, Coroutine, Generator
from typing import TYPE_CHECKING
import trio.lowlevel
from typing_extensions import ParamSpec, Self, TypeVar, overload