Last active
December 6, 2017 19:30
-
-
Save blvp/57d1b9f93b0a07fb94d693f7fde1425b to your computer and use it in GitHub Desktop.
mist python api proposal
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import functools | |
| from abc import abstractmethod | |
| def with_args(*args_decorator): | |
| def wrapper(f): | |
| for arg_decorator in args_decorator: | |
| f = arg_decorator(f) | |
| f.arg_defs = args_decorator | |
| return f | |
| return wrapper | |
| class BadParameterException(Exception): | |
| def __init(self, name, type, additional_info=""): | |
| self.name = name | |
| self.type = type | |
| self.additional_info = additional_info | |
| def __str__(self): | |
| return "Bad parameter {} of type {}: {}".format(self.name, self.type, self.additional_info) | |
| class Extras(object): | |
| def __init__(self, job_id, log_conf): | |
| self._job_id = job_id | |
| self._logger = None | |
| self._log_conf = log_conf | |
| @property | |
| def job_id(self): | |
| return self._job_id | |
| @property | |
| def logger(self): | |
| if self._logger is None: | |
| self._logger = self.__create_logger_from_conf() | |
| return self._logger | |
| def __create_logger_from_conf(self): | |
| self._log_conf | |
| return None | |
| # class ArgDecorator(object): | |
| # def __init__(self, name, type_hint, default=None, optional=False, callback=None): | |
| # self.name = name | |
| # self.type_hind = type_hint | |
| # self.default = default | |
| # self.optional = optional | |
| # self.callback = callback | |
| # def __call__(self, func): | |
| class AbstractArg(object): | |
| def __init__(self, name, type_hint, callback=None): | |
| self.name = name | |
| self.type_hint = type_hint | |
| self.callback = callback | |
| @abstractmethod | |
| def _decorate(self, fn_args, fn_kwargs): | |
| return fn_args, fn_kwargs | |
| def __call__(self, fn): | |
| @functools.wraps(fn) | |
| def actual_decorator(*args, **kwargs): | |
| new_args, new_kwargs = self._decorate(args, kwargs) | |
| return fn(*new_args, **new_kwargs) | |
| return actual_decorator | |
| class Arg(AbstractArg): | |
| def _decorate(self, fn_args, fn_kwargs): | |
| if self.name not in fn_kwargs: | |
| raise BadParameterException("value should be presented in kwargs by %s".format(self.name)) | |
| return fn_args, fn_kwargs | |
| class OptionalArg(AbstractArg): | |
| def __init__(self, name, type_hint, callback=None, default=None): | |
| super(OptionalArg, self).__init__(name, type_hint, callback) | |
| self.default = default | |
| def _decorate(self, fn_args, fn_kwargs): | |
| if self.name in fn_kwargs: | |
| return fn_args, fn_kwargs | |
| else: | |
| fn_kwargs[self.name] = self.default | |
| return fn_args, fn_kwargs | |
| def internal_arg(arg_type, tags=set()): | |
| def wrapper(f): | |
| @functools.wraps(f) | |
| def yoyo(*args, **kwargs): | |
| if len(args) != 1: | |
| raise BadParameterException("only one internal argument supported, but got %s".format(len(args))) | |
| return f(args[0], **kwargs) | |
| yoyo.internal_arg_type = arg_type | |
| yoyo.fn_tags = tags | |
| return yoyo | |
| return wrapper | |
| def arg(name, type_hint, default=None, callback=None): | |
| if default is None: | |
| yoyo = Arg(name, type_hint, callback) | |
| else: | |
| yoyo = OptionalArg(name, type_hint, callback, default) | |
| return yoyo | |
| on_spark_context = internal_arg('sc') | |
| on_spark_session = internal_arg('session', {'sql'}) | |
| on_sql_context = internal_arg('sql_context', {'sql'}) | |
| on_hive_context = internal_arg('hive_context', {'sql', 'hive'}) | |
| on_hive_session = internal_arg('hive_session', {'sql', 'hive'}) | |
| on_streaming_context = internal_arg('streaming', {'streaming'}) | |
| def make_extras_wrapper(): | |
| def wrapper(f): | |
| @functools.wraps(f) | |
| def with_extras(*args, **kwargs): | |
| if 'extras' not in kwargs: | |
| raise BadParameterException('mist extras should be presented in kwargs by name \'extras\'') | |
| return f(*args, **kwargs) | |
| with_extras.with_extras = True | |
| return with_extras | |
| return wrapper | |
| with_mist_extras = make_extras_wrapper() | |
| """ | |
| class ttype(object): | |
| pass | |
| class complex_type(ttype): | |
| def __init__(self, box_type, boxed_type): | |
| self.box_type = box_type | |
| self.boxed_type = boxed_type | |
| def listt(boxed_type): | |
| return complex_type(list, boxed_type) | |
| def mapt(boxed_type): | |
| return complex_type(map, boxed_type) | |
| """ | |
| @with_args( | |
| arg('num', list), | |
| arg('mul', int, default=2) | |
| ) | |
| @with_mist_extras | |
| @on_spark_session | |
| def my_awesome_fn(sc, extras, num, mul): | |
| return sc, num, mul, extras | |
| print my_awesome_fn(42, **{'num': [1, 2, 3], 'extras': Extras('123', '123')}) | |
| print my_awesome_fn.internal_arg_type, \ | |
| my_awesome_fn.fn_tags, my_awesome_fn.arg_defs, my_awesome_fn.with_extras |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
arg name here and in function definition should be the same.
So for example in scala job it doesn't:
You can define lambda function with any parameter name you want.
Is it a big deal?