Skip to content

Instantly share code, notes, and snippets.

@ottomata
Created February 7, 2022 22:22
Show Gist options
  • Save ottomata/4f7ff71f107943e0c7025c35518ac3d3 to your computer and use it in GitHub Desktop.
Save ottomata/4f7ff71f107943e0c7025c35518ac3d3 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
"""
Imports and calls a python function.
call.py is a standalone python module and
CLI, and should not import any dependencies
unless they are available in any standard
python environment.
Usage:
call.py 'my.package:callable' arg1 arg2
"""
from typing import List
import argparse
import json
import os
import sys
import re
import importlib
import logging
import subprocess
from glob import glob
log = logging.getLogger('call.py')
def get_possible_sys_paths(env_prefix: str) -> List[str]:
"""
If env_prefix exists, then its lib directory
will be searched for existent python* paths.
Else, the current running python version
will be used to infer a series of paths to add
to sys paths.
In this way, possible sys.path entries can be added
even if the prefix doesn't exist (yet). E.g. In the case
of Spark on YARN, it may be that the env_prefix does not
exist on the master (where call.py is run), but does on
the executors via spark-submit --archives option.
:param env_prefix:
:return: possible python sys path lib dirs
"""
if os.path.exists(env_prefix):
python_lib_dirs = glob(
os.path.join(env_prefix, 'lib', 'python*')
)
for result in python_lib_dirs.copy():
if os.path.isdir(result):
python_lib_dirs += [
result,
os.path.join(result, 'site-packages'),
os.path.join(result, 'lib-dynload'),
]
else:
python_version = f'{sys.version_info.major}.{sys.version_info.minor}'
python_lib_dirs = [
os.path.join(env_prefix, 'lib', f'python{python_version}'),
os.path.join(env_prefix, 'lib', f'python{python_version}', 'site-packages'),
os.path.join(env_prefix, 'lib', f'python{python_version}', 'lib-dynload'),
os.path.join(env_prefix, 'lib', 'python'),
os.path.join(env_prefix, 'lib', 'python', 'site-packages'),
os.path.join(env_prefix, 'lib', 'python', 'lib-dynload'),
]
return python_lib_dirs
def python_env_sys_path(env_prefix: str) -> List[str]:
"""
Gets the relevant sys.path entries for a python environment
by shelling out to its python and extracting sys.path, or
(if python is not installed in env_prefix), inferring the
possible env_prefix sys path entries by calling
get_possible_sys_paths.
:param env_prefix:
Path to python environment prefix.
:return:
List of entries you might want to add to sys.path to
make it possible to import code from env_prefix.
"""
# If python exists in env_prefix, shell out to it to
# get its sys.path.
python_exec = os.path.join(env_prefix, 'bin', 'python')
if os.access(python_exec, os.X_OK):
command = [
python_exec,
'-c',
f"import sys; print('\\n'.join([p for p in sys.path if p.startswith('{env_prefix}')]))"
]
return subprocess.check_output(command).decode('utf-8').strip().split('\n')
# Else infer sys.paths in env_prefix.
return get_possible_sys_paths(env_prefix)
def sys_path_prepend(paths: List[str]):
"""
Appends paths to sys.path
:param paths:
paths to add to sys.path
"""
sys.path = paths + sys.path
def include_python_env_sys_path(other_python_env_prefix: str):
"""
Adds an external conda environment's sys.path entries
to the current running python sys.path.
Doing this lets you import python modules from the other
conda environment, essentially 'stacking' this python
on top of it.
:param conda_env_prefix:
Path to other conda environment. bin/python must exist here.
:return:
List of path entries added to our sys.path
"""
other_python_env_paths = python_env_sys_path(other_python_env_prefix)
paths_to_add = [p for p in other_python_env_paths if p not in sys.path]
sys_path_prepend(paths_to_add)
return paths_to_add
def import_it(name: str):
"""
Imports a python module / symbol from a string name and returns it.
It is up to you to make sure that the module is importable
via python sys.path.
:param name:
Python module / symbol name. Examples:
* 'mypackage.thing' -> from mypackage import thing
* 'mypackage:callable' -> from mypackage import callable
* 'mypackage' -> import mypackage
:return: imported Python module symbol
"""
if ':' in name:
module_name, symbol_name = name.rsplit(':', 1)
elif '.' in name:
module_name, symbol_name = name.rsplit('.', 1)
else:
module_name = name
symbol_name = None
module = importlib.import_module(module_name)
log.info(f'Imported {name} as {module.__file__}')
if symbol_name is None:
return module
return getattr(module, symbol_name)
def parse_callable_args(args=[], format='positional'):
"""
Parses list of args into positional and kwargs for
passing to a callable function.
Can parse args in 3 different formats.
- positional (default):
just use args positional args, no kwargs.
- argv
Assume that args is meant to be passed to a CLI
parsing function, e.g. as if it was sys.argv[1:].
In this case, the first positional arg should be
the argv array, so positional_args will be [args]
and kwargs will be empty.
- json
Parse each arg as a json string. If the
arg is a dict, it is assumed to be a kwarg,
else it is a positional args.
Use format=json if you need to pass comprehensive
argument types like ints or lists as args to your
callable.
Returns a tuple of (positional_args, kwargs).
:param args: args to parse
:param format: One of positional, argv, or json.
:return: tuple of (positional_args, kwargs)
"""
positional_args = []
kwargs = {}
if format == 'positional':
positional_args = args
elif format == 'argv':
positional_args=[args]
elif format == 'json':
for arg in args:
# If the arg does not start with a valid json
# opener, assume it is a string value,
# don't bother parsing it.s
if not re.match(r'^["\[\{]', arg):
positional_args += [arg]
else:
parsed_arg = json.loads(arg)
# If the arg is a dict, then assume it is a kwarg.
# This won't work if the callable takes a dict
# as an arg.
if isinstance(parsed_arg, dict):
kwargs.update(parsed_arg)
else:
# Else it is a primitive value, or perhaps a list.
positional_args += [parsed_arg]
return (positional_args, kwargs)
def call(name: str, positional_args, kwargs):
"""
Calls a python function by string name.
:param name:
Python module callable name.
E.g. my.package:callable
:param positional_args:
positional args to pass to callable.
:param kwargs:
kwargs to pass to callable
:return: result of callable
"""
log.info(f'Calling {name} with {positional_args}, {kwargs}. sys.path: {sys.path}')
func = import_it(name)
return func(*positional_args, **kwargs)
def cli(argv=None):
"""
Imports callable and calls it with args.
If --prefix are given (e.g. a conda environemnt prefix path),
sys.path will be altered to include that python environment.
This allows us to call a function that might have dependencies in another environment,
effectively 'stacking' the current python env on the other env(s).
--args-format determines how to parse args for the callable. Any args that are not
handled by call.py will be used for the callable.
Example:
::
call.py shutil.which python
call.py --prefix /my/conda/env myproject.transform arg1 arg2
call.py --prefix /my/conda/env --args-format=argv myproject.main --arg1=arg1 --arg2=arg2
call.py --prefix /my/conda/env --args-format=json myproject.fancyfunc \
'["elem1", "elem2"]' '{"kwarg1": "a"}'
"""
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description=cli.__doc__)
parser.add_argument(
'--prefix',
action='append',
dest='prefix',
type=str,
help='Python environment prefix to include in sys.path.',
)
parser.add_argument(
'--log-level',
dest='log_level',
type=str,
help='Log level of call.py logger',
)
parser.add_argument(
'--args-format',
dest='args_format',
type=str,
help='How to parse args for callable.',
choices=[
'positional',
'argv',
'json',
],
default='positional'
)
parser.add_argument('callable')
# callable_args will be anything not parsed by parser.
(args, callable_args) = parser.parse_known_args(argv)
if args.log_level:
log.setLevel(args.log_level)
if args.prefix:
for prefix in args.prefix:
include_python_env_sys_path(prefix)
(callable_pargs, callable_kwargs) = parse_callable_args(
format=args.args_format,
args=callable_args
)
log.info(call(args.callable, callable_pargs, callable_kwargs))
if __name__ == '__main__':
cli()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment