Last active
April 14, 2021 15:16
-
-
Save lukegre/d22ef2fd69812882cf5c5c6232de5b60 to your computer and use it in GitHub Desktop.
Quick place to store useful function decorators
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def load_url_as_python_module(url, pkg_name=None, save_dir=None): | |
"""Imports a python file url as a package. | |
Downloads the file to local storage and imports into the global namespace. | |
Can be useful to quickly access gists and github files as functions. | |
Warning: this function could be VERY dangerous as a script could be | |
run on your machine without knowing what it does... be very careful | |
Args: | |
url (str): the url to the python file | |
pkg_name (str): if None, then the package name is determined | |
by the file name. If a string is given, then the file will | |
be imported as that name | |
save_dir (str): if not None, then the file will be saved to the | |
given directory. If None, then uses a temporary directory | |
Returns: | |
None: The function is added to the global namespace | |
""" | |
import sys | |
from importlib import util | |
from pooch import retrieve | |
from pathlib import Path as path | |
name = path(url).name | |
if pkg_name is not None: | |
pkg = pkg_name | |
else: | |
pkg = name.split('.')[0] | |
file_path = path(retrieve(url, None, fname=name, path=save_dir)) | |
sys.path.insert(0, str(file_path.parent)) | |
print(f'loading `{pkg}` into global namespace') | |
spec = importlib.util.spec_from_file_location(name, str(file_path)) | |
mod = importlib.util.module_from_spec(spec) | |
spec.loader.exec_module(mod) | |
globals().update({pkg: mod}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A few useful function for parallel and asynchronous processing. | |
BUGGY | |
""" | |
from threading import Thread | |
from functools import wraps | |
class ThreadWithReturnValue(Thread): | |
def __init__(self, group=None, target=None, name=None, | |
args=(), kwargs={}, Verbose=None): | |
Thread.__init__(self, group, target, name, args, kwargs) | |
self._return = None | |
def run(self): | |
if self._target is not None: | |
self._return = self._target(*self._args, **self._kwargs) | |
def result(self, *args): | |
Thread.join(self, *args) | |
return self._return | |
def __repr__(self): | |
while self.is_alive(): | |
return 'process is running silently in thread' | |
return 'process is done, get results with Thread[:]' | |
def __getitem__(self, index): | |
return self.result() | |
def asynk(func): | |
"""Function decorator, intended to make "func" run in a separate | |
thread (asynchronously). | |
Example: | |
.. code-block:: python | |
@async | |
def task1(): | |
do_something | |
@async | |
def task2(): | |
do_something_too | |
t1 = task1() | |
t2 = task2() | |
t1.join() | |
t2.join() | |
""" | |
@wraps(func) | |
def _inner(*args, **kwargs): | |
func_th = ThreadWithReturnValue(target=func, args=args, kwargs=kwargs) | |
func_th.start() | |
return func_th | |
return _inner | |
def synk(func): | |
"""Function decorator, intended to make "func" wait for threading async | |
finished. | |
Example: | |
.. code-block:: python | |
@async | |
def task1(): | |
do_something | |
@async | |
def task2(): | |
do_something_too | |
@sync | |
def task3(): | |
do_something_when_task1_and_task2_finished() | |
t1 = task1() | |
t2 = task2() | |
t3 = task3() # Only runs when task1 and task2 finished. | |
""" | |
@wraps(func) | |
def _inner(*args, **kwargs): | |
import threading | |
import time | |
while threading.activeCount() > 1: | |
time.sleep(1) | |
return func(*args, **kwargs) | |
return _inner | |
def parallel(func, n_jobs=8, verbose=True, threading=False): | |
""" | |
Parallel implementation for any function | |
It's quick, it's dirty, it might fail, but it's beautiful when it works. | |
This wrapper uses joblib in the backend to run scripts in parallel. | |
""" | |
from functools import wraps | |
@wraps(func) | |
def run_parallel(*args, n_jobs=n_jobs, verbose=verbose, **kwargs): | |
"""Runs the function through joblib. limited funcionality | |
""" | |
from joblib import Parallel, delayed, parallel_backend | |
from collections.abc import Iterable | |
def isiter(v): | |
not_string = not isinstance(v, str) | |
is_iter = isinstance(v, Iterable) | |
return not_string and is_iter | |
if not all([isiter(a) for a in args]): | |
raise ValueError( | |
'Note that this function has been parallelised. You can thus ' | |
'only pass arguements that are iterable to the function. ' | |
'These can be lists, tuples, etc. but not strings. ' | |
'All the iterable items must be the same length, if not, then ' | |
'an error will be raised. ') | |
lengths_args = set([len(a) for a in args if isiter(a)]) | |
not_iters = all([not isiter(v) for v in kwargs.values()]) | |
assert len(lengths_args) <= 1, 'All parallel inputs must have the same length on the 1st dimension' | |
assert not_iters, 'keyword arguements cannot be iterable' | |
len_arg = list(lengths_args)[0] | |
if len_arg < n_jobs: | |
n_jobs = len_arg | |
function = delayed(func) | |
parallel = Parallel( | |
verbose=verbose, | |
prefer='threading' if threading else 'processes', | |
n_jobs=n_jobs) | |
delayed_calls = [] | |
for arg in zip(*args): | |
delayed_calls += function(*arg, **kwargs), | |
return parallel(delayed_calls) | |
return run_parallel |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment