Last active
February 27, 2019 04:23
-
-
Save nave91/52670b5522969ea58e6bf15a100350c9 to your computer and use it in GitHub Desktop.
Containerized task framework to run ML models
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# MIT License | |
# Copyright (c) 2019 Bellhops Inc. | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# The above copyright notice and this permission notice shall be included in all | |
# copies or substantial portions of the Software. | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE. | |
import logging | |
from airflow.operators.bash_operator import BashOperator | |
from airflow.operators.subdag_operator import SubDagOperator | |
from airflow.hooks.postgres_hook import PostgresHook | |
from airflow.hooks.S3_hook import S3Hook | |
from airflow import utils as airflow_utils | |
class ContainerSubdagOperator(SubDagOperator): | |
@airflow_utils.apply_defaults | |
def __init__(self, dag, task_id, start_date, schedule_interval, default_args, containers_config, containers, **kwargs): | |
self.start_date = start_date | |
self.dag_schedule_interval = schedule_interval | |
self.default_args = default_args | |
self.containers_config = containers_config | |
self.containers = containers | |
self.tasks = {} | |
from airflow import DAG # circular import | |
self.sub_dag_name = dag.dag_id + '.' + task_id | |
self.subdag = DAG( | |
self.sub_dag_name, | |
start_date=self.start_date, | |
schedule_interval=self.dag_schedule_interval, | |
default_args=self.default_args | |
) | |
self.init_tasks() | |
super(ContainerSubdagOperator, self).__init__( | |
dag=dag, | |
task_id=task_id, | |
subdag=self.subdag | |
) | |
@property | |
def task_type(self): | |
return 'SubDagOperator' | |
def init_tasks(self): | |
for container in self.containers: | |
if 'run_branches' in container and container['run_branches']: | |
task = BranchSubdagOperator( | |
task_id=container['name'], | |
container=container, | |
containers_config=self.containers_config, | |
dag=self.subdag, | |
branch_names=container['branch_names'], | |
email=[container['owner']], | |
schedule_interval=self.dag_schedule_interval, | |
default_args=self.default_args, | |
start_date=self.start_date | |
) | |
self.tasks[container['name']] = task | |
else: | |
task = ContainerOperator( | |
container=container, | |
containers_config=self.containers_config, | |
dag=self.subdag, | |
email=[container['owner']] | |
) | |
self.tasks[container['name']] = task | |
for container in self.containers: | |
for dependency_name in container['depends_on']: | |
self.tasks[container['name']].set_upstream(self.tasks[dependency_name]) | |
class BranchSubdagOperator(SubDagOperator): | |
@airflow_utils.apply_defaults | |
def __init__(self, dag, task_id, start_date, schedule_interval, default_args, containers_config, container, branch_names, **kwargs): | |
self.start_date = start_date | |
self.dag_schedule_interval = schedule_interval | |
self.default_args = default_args | |
self.containers_config = containers_config | |
self.container = container | |
self.branch_names = branch_names | |
self.tasks = {} | |
from airflow import DAG # circular import | |
self.sub_dag_name = dag.dag_id + '.' + task_id | |
self.subdag = DAG( | |
self.sub_dag_name, | |
start_date=self.start_date, | |
schedule_interval=self.dag_schedule_interval, | |
default_args=self.default_args | |
) | |
self.init_tasks() | |
super(BranchSubdagOperator, self).__init__( | |
dag=dag, | |
task_id=task_id, | |
subdag=self.subdag | |
) | |
@property | |
def task_type(self): | |
return 'SubDagOperator' | |
def init_tasks(self): | |
if 'run_branches' in self.container and self.container['run_branches']: | |
last_task_name = None | |
for branch_name in self.container['branch_names']: | |
task = ContainerOperator( | |
task_id=branch_name, | |
container=self.container, | |
containers_config=self.containers_config, | |
dag=self.subdag, | |
branch_name=branch_name, | |
email=[self.container['owner']] | |
) | |
self.tasks[branch_name] = task | |
if last_task_name is not None: | |
self.tasks[branch_name].set_upstream(self.tasks[last_task_name]) | |
last_task_name = branch_name | |
class ContainerOperator(BashOperator): | |
@airflow_utils.apply_defaults | |
def __init__(self, dag, container, containers_config, branch_name=None, task_id=None, **kwargs): | |
self.containers_config = containers_config | |
self.branch_name = branch_name | |
shell_script_template = self.build_shell_script() | |
params = self.get_params(container=container) | |
task_id = task_id or container['name'] | |
super(CommunionOperator, self).__init__( | |
dag=dag, | |
task_id=task_id, | |
bash_command=shell_script_template, | |
params=params, | |
email=[container['owner']] | |
) | |
def get_params(self, container): | |
params = {} | |
s3_conn = S3Hook(aws_conn_id='s3_conn') | |
aws_key, aws_pass, _, _ = s3_conn._get_credentials(None) | |
# Set defaults | |
params['run_command'] = 'python3 run.py' | |
params['env'] = { | |
'AWS_ACCESS_KEY_ID': aws_key, | |
'AWS_SECRET_ACCESS_KEY': aws_pass, | |
'S3_BUCKET': self.containers_config['S3_BUCKET'], | |
'S3_DIRECTORY': self.containers_config['S3_DIRECTORY'], | |
} | |
params['shared_directory'] = self.containers_config['SHARED_DIRECTORY'] | |
if 'cli_options' in container: | |
params['cli_options'] = container['cli_options'] | |
else: | |
params['cli_options'] = '' | |
if self.branch_name: | |
params['repo'] = ' -b {branch_name} {repo}'.format(branch_name=str(self.branch_name), | |
repo=container['repo']) | |
params['branch_name'] = self.branch_name | |
params['name'] = container['name'] + '/' + self.branch_name | |
else: | |
params['repo'] = container['repo'] | |
params['name'] = container['name'] | |
return params | |
@staticmethod | |
def build_shell_script(): | |
''' | |
MAC: | |
echo DOCKER_HOST='host.docker.internal' >> .env | |
export DOCKER_HOST='host.docker.internal' | |
echo DOCKER_PORT=2377 >> .env | |
export DOCKER_PORT=2377 | |
''' | |
shell_script_template = ''' | |
rm -rf {{ params.shared_directory }}/{{ params.name }} | |
if [ ! -d {{ params.shared_directory }}/{{ params.name }} ] | |
then | |
mkdir -p {{ params.shared_directory }}/{{ params.name }} | |
fi | |
git clone {{ params.repo }} {{ params.shared_directory }}/{{ params.name }} | |
cd {{ params.shared_directory }}/{{ params.name }} | |
echo DOCKER_HOST=`/sbin/ip route|awk '/default/ { print $3 }'` >> .env | |
export DOCKER_HOST=`/sbin/ip route|awk '/default/ { print $3 }'` | |
echo DOCKER_PORT=2375 >> .env | |
export DOCKER_PORT=2375 | |
{% for key, value in params.env.items() %} | |
echo "{{ key }}={{ value }}" >> .env | |
{% endfor %} | |
docker -H tcp://$DOCKER_HOST:$DOCKER_PORT build -t {{ params.name }} . | |
docker -H tcp://$DOCKER_HOST:$DOCKER_PORT run --env-file=.env {{ params.name }} {{ params.run_command }} {{ params.cli_options}} | |
docker -H tcp://$DOCKER_HOST:$DOCKER_PORT rm $(docker -H tcp://$DOCKER_HOST:$DOCKER_PORT ps -a | grep {{ params.name }} | cut -f1 -d' ') | |
''' | |
return shell_script_template |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment