Skip to content

Instantly share code, notes, and snippets.

View goodboy's full-sized avatar
🥱
seriously, don't be serious..

goodboy

🥱
seriously, don't be serious..
View GitHub Profile
class EnterTimeout:
def __init__(self, async_cm, timeout):
self._async_cm = async_cm
self._timeout = timeout
async def __aenter__(self):
with fail_after(self.timeout):
return await self._async_cm.__aenter__()
async def __aexit__(self, *args):
import trio
import sys
import time
import httpx
from outcome import Error
import traceback
# Can't use PySide2 currently because of
# https://bugreports.qt.io/projects/PYSIDE/issues/PYSIDE-1313
from PyQt5 import QtCore, QtWidgets
@guilledk
guilledk / aqueue.py
Last active February 20, 2020 17:51
Actor queue implementation, with history, and pub-sub
import math
import trio
class AsyncQueue:
def __init__(self):
self.history = []
self.inport, self.outport = trio.open_memory_channel(math.inf)
@maxidorius
maxidorius / notes.md
Last active November 16, 2023 00:05
Notes on privacy and data collection of Matrix.org

Notes on privacy and data collection of Matrix.org


This version of the document is no longer canonical. You can find the canonical version hosted at Gitlab and Github.

PART 2 IS OUT, INCLUDING THE DISCLOSURE OF A GLOBAL FEDERATION DATA LEAK, AND THE ANATOMY OF A GDPR DATA REQUEST HANDLED BY MATRIX.ORG. SEE THE REPOS ABOVE.

from kivy.factory import Factory as F
from kivy.app import App
from kivy.lang import Builder
KV = '''
<ButtonLayout>:
on_press: print("here or there")
Label:
class UniversalQueue:
def __init__(self, *args, **kwargs):
self._queue = trio.Queue(*args, **kwargs)
self._portal = trio.BlockingTrioPortal()
async def trio_get(self):
return await self._queue.trio_get()
def thread_get(self):
return self._portal.run(self._queue.trio_get)
@Nadrieril
Nadrieril / shell.nix
Last active May 6, 2025 10:11
Building LineageOS on NixOS
# I used this shell.nix to build LineageOS 13.0 for my maguro (Samsung Galaxy Nexus GSM) phone
# The build instructions for normal Linuxes are here: https://wiki.lineageos.org/devices/maguro/build
# For NixOS, follow those instructions but skip anything related to installing packages
# Detailed instructions:
# cd into an empty directory of your choice
# copy this file there
# in nix-shell:
# $ repo init -u https://github.com/LineageOS/android.git -b cm-13.0
# $ repo sync
# $ source build/envsetup.sh
@nonsleepr
nonsleepr / demo.py
Created July 5, 2018 21:13
Multiplexer for trio: my take on python-trio/trio#467
import trio
from multiplexer import Multiplexer
async def reader(mx, key, timeout=100):
print(f'Waiting for "{key}"...')
try:
with trio.fail_after(timeout):
value = await mx[key]
print(f'Got value "{value}" for key {key}')
@vodik
vodik / observable.py
Last active July 30, 2018 18:48
New observable
import asyncio
import collections
from collections.abc import AsyncGenerator
class Subject(AsyncGenerator):
def __init__(self, *, loop=None):
if loop is None:
self._loop = asyncio.get_event_loop()
else:
@timkpaine
timkpaine / iex.py
Last active December 9, 2022 19:46
Simple Real-Time Stock Streaming with Bokeh
'''
To run:
python -m bokeh serve iex.py
'''
import io