Created
April 8, 2016 11:32
-
-
Save seregasheypak/b54813838076e62684dff783303f4ee9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class MyBaseOperator(BaseOperator): | |
TASK_ID = 'BUILD_ID_TASK' | |
PIPELINE_BUILD_ID_KEY = 'BUILD_ID' | |
@apply_defaults | |
def __init__(self, *args, **kwargs): | |
super(MyBaseOperator, self).__init__(*args, **kwargs) | |
def execute(self, context): | |
pass | |
def get_pipeline_build_id(self, context): | |
self.xcom_pull(context=context, dag_id=self.dag_id, task_ids=[MyBaseOperator.TASK_ID], | |
key=MyBaseOperator.PIPELINE_BUILD_ID_KEY) | |
def get_pipeline_base_path(self, context): | |
base_path = "{}/{}".format(self.dag_id, self.get_pipeline_build_id(context)) | |
logging.info("Pipeline base path: {}".format(base_path)) | |
return base_path | |
class BuildIdOperator(MyBaseOperator): | |
""" | |
pushes build_id using x_com | |
""" | |
@apply_defaults | |
def __init__(self, *args, **kwargs): | |
super(BuildIdOperator, self).__init__(task_id=MyBaseOperator.TASK_ID, *args, **kwargs) | |
def execute(self, context): | |
if not self.get_pipeline_build_id(context): | |
build_id = BuildIdOperator.generate_build_id() | |
logging.info("Generating build_id:{}={}".format(BuildIdOperator.PIPELINE_BUILD_ID_KEY, build_id)) | |
self.xcom_push(context=context, key=BuildIdOperator.PIPELINE_BUILD_ID_KEY, value=build_id) | |
# returns None!!!! | |
logging.info("Kill me later, check that id saved:{}".format(self.get_pipeline_build_id(context))) | |
# returns saved value !!! WHY??? It's copy-paste of get_pipeline_build_id method body | |
here = self.xcom_pull(context=context, dag_id=self.dag_id, task_ids=[MyBaseOperator.TASK_ID], | |
key=MyBaseOperator.PIPELINE_BUILD_ID_KEY) | |
logging.info("HERE???? {}".format(here)) | |
else: | |
logging.info("Build_id [{}] is assigned for this DAG execution, skipping BuildId generation." | |
.format(self.get_pipeline_build_id(context))) | |
@staticmethod | |
def generate_build_id(ms_length=3): | |
from datetime import datetime | |
return datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-ms_length] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment