Skip to content

Instantly share code, notes, and snippets.

@lucemia
Created June 16, 2014 14:54
Show Gist options
  • Select an option

  • Save lucemia/23adeed1f577aacf571a to your computer and use it in GitHub Desktop.

Select an option

Save lucemia/23adeed1f577aacf571a to your computer and use it in GitHub Desktop.
def load_pipeline(cls_path):
module_path, class_name = ".".join(cls_path.split('.')[:-1]), cls_path.split('.')[-1]
mod = __import__(module_path, fromlist=[class_name])
return getattr(mod, class_name)
class TriggerHandler(ApiHandler):
def get(self):
id = self.request.get("id")
path = self.request.get("path")
args = self.request.get("args")
method = method.lower()
assert path
cls = load_pipeline(path)
p = cls(*params)
p.start()
task_id = id or p.root_pipeline_id
_TA_Task(
id=task_id,
root_pipeline_id=p.root_pipeline_id,
params={
"id": id,
"path": path,
"params": params
}
).put()
return self.output({
"id": task_id
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment