Skip to content

Instantly share code, notes, and snippets.

@seregasheypak
Created April 8, 2016 11:32
Show Gist options
  • Save seregasheypak/b54813838076e62684dff783303f4ee9 to your computer and use it in GitHub Desktop.
Save seregasheypak/b54813838076e62684dff783303f4ee9 to your computer and use it in GitHub Desktop.
class MyBaseOperator(BaseOperator):
TASK_ID = 'BUILD_ID_TASK'
PIPELINE_BUILD_ID_KEY = 'BUILD_ID'
@apply_defaults
def __init__(self, *args, **kwargs):
super(MyBaseOperator, self).__init__(*args, **kwargs)
def execute(self, context):
pass
def get_pipeline_build_id(self, context):
self.xcom_pull(context=context, dag_id=self.dag_id, task_ids=[MyBaseOperator.TASK_ID],
key=MyBaseOperator.PIPELINE_BUILD_ID_KEY)
def get_pipeline_base_path(self, context):
base_path = "{}/{}".format(self.dag_id, self.get_pipeline_build_id(context))
logging.info("Pipeline base path: {}".format(base_path))
return base_path
class BuildIdOperator(MyBaseOperator):
"""
pushes build_id using x_com
"""
@apply_defaults
def __init__(self, *args, **kwargs):
super(BuildIdOperator, self).__init__(task_id=MyBaseOperator.TASK_ID, *args, **kwargs)
def execute(self, context):
if not self.get_pipeline_build_id(context):
build_id = BuildIdOperator.generate_build_id()
logging.info("Generating build_id:{}={}".format(BuildIdOperator.PIPELINE_BUILD_ID_KEY, build_id))
self.xcom_push(context=context, key=BuildIdOperator.PIPELINE_BUILD_ID_KEY, value=build_id)
# returns None!!!!
logging.info("Kill me later, check that id saved:{}".format(self.get_pipeline_build_id(context)))
# returns saved value !!! WHY??? It's copy-paste of get_pipeline_build_id method body
here = self.xcom_pull(context=context, dag_id=self.dag_id, task_ids=[MyBaseOperator.TASK_ID],
key=MyBaseOperator.PIPELINE_BUILD_ID_KEY)
logging.info("HERE???? {}".format(here))
else:
logging.info("Build_id [{}] is assigned for this DAG execution, skipping BuildId generation."
.format(self.get_pipeline_build_id(context)))
@staticmethod
def generate_build_id(ms_length=3):
from datetime import datetime
return datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-ms_length]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment