Skip to content

Instantly share code, notes, and snippets.

@blvp
Last active December 6, 2017 19:30
Show Gist options
  • Select an option

  • Save blvp/57d1b9f93b0a07fb94d693f7fde1425b to your computer and use it in GitHub Desktop.

Select an option

Save blvp/57d1b9f93b0a07fb94d693f7fde1425b to your computer and use it in GitHub Desktop.
mist python api proposal
import functools
from abc import abstractmethod
def with_args(*args_decorator):
def wrapper(f):
for arg_decorator in args_decorator:
f = arg_decorator(f)
f.arg_defs = args_decorator
return f
return wrapper
class BadParameterException(Exception):
def __init(self, name, type, additional_info=""):
self.name = name
self.type = type
self.additional_info = additional_info
def __str__(self):
return "Bad parameter {} of type {}: {}".format(self.name, self.type, self.additional_info)
class Extras(object):
def __init__(self, job_id, log_conf):
self._job_id = job_id
self._logger = None
self._log_conf = log_conf
@property
def job_id(self):
return self._job_id
@property
def logger(self):
if self._logger is None:
self._logger = self.__create_logger_from_conf()
return self._logger
def __create_logger_from_conf(self):
self._log_conf
return None
# class ArgDecorator(object):
# def __init__(self, name, type_hint, default=None, optional=False, callback=None):
# self.name = name
# self.type_hind = type_hint
# self.default = default
# self.optional = optional
# self.callback = callback
# def __call__(self, func):
class AbstractArg(object):
def __init__(self, name, type_hint, callback=None):
self.name = name
self.type_hint = type_hint
self.callback = callback
@abstractmethod
def _decorate(self, fn_args, fn_kwargs):
return fn_args, fn_kwargs
def __call__(self, fn):
@functools.wraps(fn)
def actual_decorator(*args, **kwargs):
new_args, new_kwargs = self._decorate(args, kwargs)
return fn(*new_args, **new_kwargs)
return actual_decorator
class Arg(AbstractArg):
def _decorate(self, fn_args, fn_kwargs):
if self.name not in fn_kwargs:
raise BadParameterException("value should be presented in kwargs by %s".format(self.name))
return fn_args, fn_kwargs
class OptionalArg(AbstractArg):
def __init__(self, name, type_hint, callback=None, default=None):
super(OptionalArg, self).__init__(name, type_hint, callback)
self.default = default
def _decorate(self, fn_args, fn_kwargs):
if self.name in fn_kwargs:
return fn_args, fn_kwargs
else:
fn_kwargs[self.name] = self.default
return fn_args, fn_kwargs
def internal_arg(arg_type, tags=set()):
def wrapper(f):
@functools.wraps(f)
def yoyo(*args, **kwargs):
if len(args) != 1:
raise BadParameterException("only one internal argument supported, but got %s".format(len(args)))
return f(args[0], **kwargs)
yoyo.internal_arg_type = arg_type
yoyo.fn_tags = tags
return yoyo
return wrapper
def arg(name, type_hint, default=None, callback=None):
if default is None:
yoyo = Arg(name, type_hint, callback)
else:
yoyo = OptionalArg(name, type_hint, callback, default)
return yoyo
on_spark_context = internal_arg('sc')
on_spark_session = internal_arg('session', {'sql'})
on_sql_context = internal_arg('sql_context', {'sql'})
on_hive_context = internal_arg('hive_context', {'sql', 'hive'})
on_hive_session = internal_arg('hive_session', {'sql', 'hive'})
on_streaming_context = internal_arg('streaming', {'streaming'})
def make_extras_wrapper():
def wrapper(f):
@functools.wraps(f)
def with_extras(*args, **kwargs):
if 'extras' not in kwargs:
raise BadParameterException('mist extras should be presented in kwargs by name \'extras\'')
return f(*args, **kwargs)
with_extras.with_extras = True
return with_extras
return wrapper
with_mist_extras = make_extras_wrapper()
"""
class ttype(object):
pass
class complex_type(ttype):
def __init__(self, box_type, boxed_type):
self.box_type = box_type
self.boxed_type = boxed_type
def listt(boxed_type):
return complex_type(list, boxed_type)
def mapt(boxed_type):
return complex_type(map, boxed_type)
"""
@with_args(
arg('num', list),
arg('mul', int, default=2)
)
@with_mist_extras
@on_spark_session
def my_awesome_fn(sc, extras, num, mul):
return sc, num, mul, extras
print my_awesome_fn(42, **{'num': [1, 2, 3], 'extras': Extras('123', '123')})
print my_awesome_fn.internal_arg_type, \
my_awesome_fn.fn_tags, my_awesome_fn.arg_defs, my_awesome_fn.with_extras
@blvp
Copy link
Author

blvp commented Dec 6, 2017

@with_args(arg('num', list), arg('mul', int, default=2))
@on_spark_context
def my_awesome_fn(sc, num, mul):

arg name here and in function definition should be the same.
So for example in scala job it doesn't:

def handle: Handle = (arg[Seq[Int]]("num"), arg[Int]("mul", 2)).onSparkContext((sc, n, m) => ???)

You can define lambda function with any parameter name you want.
Is it a big deal?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment