Skip to content

Instantly share code, notes, and snippets.

View goodboy's full-sized avatar
🥱
seriously, don't be serious..

goodboy

🥱
seriously, don't be serious..
View GitHub Profile
@zduey
zduey / iex.py
Created March 4, 2017 16:04
Simple Real-Time Stock Streaming with Bokeh
import io
import requests
import pandas as pd
from bokeh.models import ColumnDataSource, HoverTool, ResizeTool, SaveTool
from bokeh.models.widgets import TextInput, Button
from bokeh.plotting import figure, curdoc
from bokeh.layouts import row, widgetbox
TICKER = ""
from collections import deque, OrderedDict
import trio
class MultiGetQueue:
def __init__(self, max_size):
self._max_size = max_size
# {task: abort func}
# probably should make Task._abort_func public, or maybe even change
# the reschedule code to call the abort func in general?
self._get_wait = OrderedDict()
@vodik
vodik / chain.py
Last active October 11, 2017 18:31
chaining asyncio
import asyncio
async def poopyface(value):
if value == 3:
raise RuntimeError('Bail')
print(value)
def runall(*awaitables, loop=None):
@attr.s
class TrioFuture:
_result = attr.ib(default=None)
_finished = attr.ib(default=attr.Factory(trio.Event))
def set_result(self, result):
assert not self._finished.is_set()
self._result = result
self._finished.set()
@njh
njh / aes67_wishlist.md
Last active January 22, 2025 16:11
AES67 Open Source Software Wishlist

AES67 (an open standard for high quality audio over IP) is becoming mainstream in the world of broadcast and professional audio industries, however there is a very limited amount of open source software available to interoperate with it. As a result we are often just replacing XLRs with Ethernet, without taking advantage of the possibilites the software give. While Virtual Soundcards enable some of this, native network implementations would allow greater flexibility.

This is my wishlist of things that would help change that. Hopefully one day it can be turned into a AES67 Awesome List.

As open source has resulted in very rapid evolution of the web, I believe the same is possible for professional/broadcast audio.

It is possible that some of this already exists and I just havn't found it yet. Please add a comment below if you know of something!

import trio
# Process pool based on concurrent.futures
import concurrent.futures
class TrioProcessExecutor(trio.abc.AsyncResource):
def __init__(self, max_workers=None):
self._executor = concurrent.futures.ProcessPoolExecutor(max_workers=max_workers)
async def run_sync(self, fn, *args):
fut = self._executor.submit(fn, *args)
async def run_all(*async_fns):
results = [None] * len(async_fns)
async def run_one(i, async_fn):
results[i] = await async_fn()
async with trio.open_nursery() as nursery:
for i, async_fn in enumerate(async_fns):
nursery.start_soon(run_one, i, async_fn)
@timkpaine
timkpaine / iex.py
Last active December 9, 2022 19:46
Simple Real-Time Stock Streaming with Bokeh
'''
To run:
python -m bokeh serve iex.py
'''
import io
@vodik
vodik / observable.py
Last active July 30, 2018 18:48
New observable
import asyncio
import collections
from collections.abc import AsyncGenerator
class Subject(AsyncGenerator):
def __init__(self, *, loop=None):
if loop is None:
self._loop = asyncio.get_event_loop()
else:
@nonsleepr
nonsleepr / demo.py
Created July 5, 2018 21:13
Multiplexer for trio: my take on python-trio/trio#467
import trio
from multiplexer import Multiplexer
async def reader(mx, key, timeout=100):
print(f'Waiting for "{key}"...')
try:
with trio.fail_after(timeout):
value = await mx[key]
print(f'Got value "{value}" for key {key}')