Skip to content

Instantly share code, notes, and snippets.

View goodboy's full-sized avatar
🥱
seriously, don't be serious..

goodboy

🥱
seriously, don't be serious..
View GitHub Profile
async def run_all(*async_fns):
results = [None] * len(async_fns)
async def run_one(i, async_fn):
results[i] = await async_fn()
async with trio.open_nursery() as nursery:
for i, async_fn in enumerate(async_fns):
nursery.start_soon(run_one, i, async_fn)
import trio
# Process pool based on concurrent.futures
import concurrent.futures
class TrioProcessExecutor(trio.abc.AsyncResource):
def __init__(self, max_workers=None):
self._executor = concurrent.futures.ProcessPoolExecutor(max_workers=max_workers)
async def run_sync(self, fn, *args):
fut = self._executor.submit(fn, *args)
@njh
njh / aes67_wishlist.md
Last active January 22, 2025 16:11
AES67 Open Source Software Wishlist

AES67 (an open standard for high quality audio over IP) is becoming mainstream in the world of broadcast and professional audio industries, however there is a very limited amount of open source software available to interoperate with it. As a result we are often just replacing XLRs with Ethernet, without taking advantage of the possibilites the software give. While Virtual Soundcards enable some of this, native network implementations would allow greater flexibility.

This is my wishlist of things that would help change that. Hopefully one day it can be turned into a AES67 Awesome List.

As open source has resulted in very rapid evolution of the web, I believe the same is possible for professional/broadcast audio.

It is possible that some of this already exists and I just havn't found it yet. Please add a comment below if you know of something!

@attr.s
class TrioFuture:
_result = attr.ib(default=None)
_finished = attr.ib(default=attr.Factory(trio.Event))
def set_result(self, result):
assert not self._finished.is_set()
self._result = result
self._finished.set()
@vodik
vodik / chain.py
Last active October 11, 2017 18:31
chaining asyncio
import asyncio
async def poopyface(value):
if value == 3:
raise RuntimeError('Bail')
print(value)
def runall(*awaitables, loop=None):
from collections import deque, OrderedDict
import trio
class MultiGetQueue:
def __init__(self, max_size):
self._max_size = max_size
# {task: abort func}
# probably should make Task._abort_func public, or maybe even change
# the reschedule code to call the abort func in general?
self._get_wait = OrderedDict()
@zduey
zduey / iex.py
Created March 4, 2017 16:04
Simple Real-Time Stock Streaming with Bokeh
import io
import requests
import pandas as pd
from bokeh.models import ColumnDataSource, HoverTool, ResizeTool, SaveTool
from bokeh.models.widgets import TextInput, Button
from bokeh.plotting import figure, curdoc
from bokeh.layouts import row, widgetbox
TICKER = ""
@marketcalls
marketcalls / Supertrend V5.0.afl
Last active July 2, 2022 00:50
Supertrend V5.0
/* Done by Rajandran R */
/* Author of www.marketcalls.in */
/* Date : 05th Apr 2016 */
function GetSecondNum()
{
Time = Now( 4 );
Seconds = int( Time % 100 );
Minutes = int( Time / 100 % 100 );
Hours = int( Time / 10000 % 100 );
"""Hoverable Behaviour (changing when the mouse is on the widget by O. Poyen.
License: LGPL
"""
__author__ = 'Olivier POYEN'
from kivy.properties import BooleanProperty, ObjectProperty
from kivy.core.window import Window
class HoverBehavior(object):
@diyan
diyan / gui_automation_python.md
Last active December 4, 2023 14:48
Desktop GUI automation in Python

Desktop

UI Automation. Desktop. Python

GUI toolkit agnostic

autopy - simple, cross-platform GUI automation toolkit. MIT - https://github.com/msanders/autopy/

  • 432 stars, 102 forks, 2950 monthly downloads at 2015-05-13
  • GUI toolkit agnostic