Skip to content

Instantly share code, notes, and snippets.

@lukegre
Last active April 14, 2021 15:16
Show Gist options
  • Save lukegre/d22ef2fd69812882cf5c5c6232de5b60 to your computer and use it in GitHub Desktop.
Save lukegre/d22ef2fd69812882cf5c5c6232de5b60 to your computer and use it in GitHub Desktop.
Quick place to store useful function decorators
def load_url_as_python_module(url, pkg_name=None, save_dir=None):
"""Imports a python file url as a package.
Downloads the file to local storage and imports into the global namespace.
Can be useful to quickly access gists and github files as functions.
Warning: this function could be VERY dangerous as a script could be
run on your machine without knowing what it does... be very careful
Args:
url (str): the url to the python file
pkg_name (str): if None, then the package name is determined
by the file name. If a string is given, then the file will
be imported as that name
save_dir (str): if not None, then the file will be saved to the
given directory. If None, then uses a temporary directory
Returns:
None: The function is added to the global namespace
"""
import sys
from importlib import util
from pooch import retrieve
from pathlib import Path as path
name = path(url).name
if pkg_name is not None:
pkg = pkg_name
else:
pkg = name.split('.')[0]
file_path = path(retrieve(url, None, fname=name, path=save_dir))
sys.path.insert(0, str(file_path.parent))
print(f'loading `{pkg}` into global namespace')
spec = importlib.util.spec_from_file_location(name, str(file_path))
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
globals().update({pkg: mod})
"""
A few useful function for parallel and asynchronous processing.
BUGGY
"""
from threading import Thread
from functools import wraps
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, Verbose=None):
Thread.__init__(self, group, target, name, args, kwargs)
self._return = None
def run(self):
if self._target is not None:
self._return = self._target(*self._args, **self._kwargs)
def result(self, *args):
Thread.join(self, *args)
return self._return
def __repr__(self):
while self.is_alive():
return 'process is running silently in thread'
return 'process is done, get results with Thread[:]'
def __getitem__(self, index):
return self.result()
def asynk(func):
"""Function decorator, intended to make "func" run in a separate
thread (asynchronously).
Example:
.. code-block:: python
@async
def task1():
do_something
@async
def task2():
do_something_too
t1 = task1()
t2 = task2()
t1.join()
t2.join()
"""
@wraps(func)
def _inner(*args, **kwargs):
func_th = ThreadWithReturnValue(target=func, args=args, kwargs=kwargs)
func_th.start()
return func_th
return _inner
def synk(func):
"""Function decorator, intended to make "func" wait for threading async
finished.
Example:
.. code-block:: python
@async
def task1():
do_something
@async
def task2():
do_something_too
@sync
def task3():
do_something_when_task1_and_task2_finished()
t1 = task1()
t2 = task2()
t3 = task3() # Only runs when task1 and task2 finished.
"""
@wraps(func)
def _inner(*args, **kwargs):
import threading
import time
while threading.activeCount() > 1:
time.sleep(1)
return func(*args, **kwargs)
return _inner
def parallel(func, n_jobs=8, verbose=True, threading=False):
"""
Parallel implementation for any function
It's quick, it's dirty, it might fail, but it's beautiful when it works.
This wrapper uses joblib in the backend to run scripts in parallel.
"""
from functools import wraps
@wraps(func)
def run_parallel(*args, n_jobs=n_jobs, verbose=verbose, **kwargs):
"""Runs the function through joblib. limited funcionality
"""
from joblib import Parallel, delayed, parallel_backend
from collections.abc import Iterable
def isiter(v):
not_string = not isinstance(v, str)
is_iter = isinstance(v, Iterable)
return not_string and is_iter
if not all([isiter(a) for a in args]):
raise ValueError(
'Note that this function has been parallelised. You can thus '
'only pass arguements that are iterable to the function. '
'These can be lists, tuples, etc. but not strings. '
'All the iterable items must be the same length, if not, then '
'an error will be raised. ')
lengths_args = set([len(a) for a in args if isiter(a)])
not_iters = all([not isiter(v) for v in kwargs.values()])
assert len(lengths_args) <= 1, 'All parallel inputs must have the same length on the 1st dimension'
assert not_iters, 'keyword arguements cannot be iterable'
len_arg = list(lengths_args)[0]
if len_arg < n_jobs:
n_jobs = len_arg
function = delayed(func)
parallel = Parallel(
verbose=verbose,
prefer='threading' if threading else 'processes',
n_jobs=n_jobs)
delayed_calls = []
for arg in zip(*args):
delayed_calls += function(*arg, **kwargs),
return parallel(delayed_calls)
return run_parallel
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment