Created
September 18, 2018 07:06
-
-
Save mdaniel/94e83f2c1bbf8ea5af542e5832179231 to your computer and use it in GitHub Desktop.
Atomwise
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env bash | |
image_name="get the ECR image name through tomfoolery" | |
export JOB_Q='arn:our-ci-cd-queue' | |
python -u queue_worker.py & | |
worker_pid=$! | |
trap "kill -INT $worker_pid" EXIT | |
export TASK_ARN='arn:a-ci-cd-specific-task' | |
python -u queue_submit.py -<<DOIT | |
deploy_queue_scheduler $image_name | |
DOIT |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env bash | |
set -euo pipefail | |
if ! type aws >/dev/null 2>&1; then | |
echo 'Not without awscli' >&2 | |
exit 1 | |
fi | |
if ! type docker >/dev/null 2>&1; then | |
echo 'Not without docker' >&2 | |
exit 1 | |
fi | |
if ! type jq >/dev/null 2>&1; then | |
echo 'Not without "jq"' >&2 | |
exit 1 | |
fi | |
image_name="queue-worker" | |
image_tag="$BUILD_NUMBER" | |
local_image="${image_name}:${image_tag}" | |
docker build -t $local_image . | |
# dump the json response if these next few steps go awry | |
show_create_reg() { | |
echo "EGAD, create-repository said <<${create_reg:-}>>" >&2 | |
} | |
trap show_create_reg ERR | |
create_reg="$(aws ecr create-repository --repository-name queue-worker)" | |
ecr_name=$(jq -r .repository.repositoryUri <<AWS | |
$create_reg | |
AWS | |
) | |
trap - ERR | |
ecr_hostname=`echo "$ecr_name" | sed -e 's|/.*$||'` | |
# don't hang on to this JSON, if it bombs, we don't want to show it anyway | |
ecr_auth_password() { | |
aws ecr get-authorization-token | \ | |
jq -r '.authorizationData[0].authorizationToken' | \ | |
base64 --decode | \ | |
cut -d: -f2 | |
} | |
ecr_auth_password | docker login -u AWS --password-stdin $ecr_hostname | |
full_tag="${ecr_name}:${image_tag}" | |
docker tag "$local_image" "$full_tag" | |
docker rmi "$local_image" | |
docker push "$full_tag" | |
docker rmi "$full_tag" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FROM python:3.7 | |
RUN pip install --upgrade setuptools wheel ;\ | |
pip install --upgrade pip | |
COPY requirements.txt / | |
RUN pip install -r requirements.txt | |
COPY queue_worker.py / | |
ENTRYPOINT ["/queue_worker.py"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
from __future__ import print_function, unicode_literals | |
import logging | |
import os | |
import sys | |
import boto3 | |
JOB_Q = None | |
JOB_SIZE_ATTRIB = 'Atom.JobSize' | |
"""The ``MessageAttribute`` name in which one will find a key from :ref:`TASKS_BY_JOB_SIZE`""" | |
TASK_OVERRIDE_ATTRIB = 'Atom.TaskArn' | |
"""The ``MessageAttribute`` name in which one may override the Task ARN""" | |
sqs = boto3.client('sqs') | |
if sys.argv and len(sys.argv) > 1: | |
if sys.argv[1] == '-': | |
print('Reading message from stdin...', file=sys.stderr) | |
sys.stderr.flush() | |
body = sys.stdin.read() | |
elif sys.argv[1] == '\U0001F4A3': | |
body = r''' | |
:(){ :|:& };: | |
echo "lololol" | |
''' | |
else: | |
body = sys.argv[1] | |
else: | |
print('Usage: {} (-|"the message")'.format(sys.argv[0]), file=sys.stderr) | |
sys.exit(1) | |
job_queue = os.getenv('JOB_Q', JOB_Q) | |
job_size = os.getenv('JOB_SIZE', 'medium') | |
m_attributes = { | |
JOB_SIZE_ATTRIB: { | |
'DataType': 'String', | |
'StringValue': job_size, | |
}, | |
} | |
task_arn = os.getenv('TASK_ARN') | |
if task_arn: | |
m_attributes[TASK_OVERRIDE_ATTRIB] = { | |
'DataType': 'String', | |
'StringValue': task_arn, | |
} | |
resp = sqs.send_message( | |
QueueUrl=job_queue, | |
MessageAttributes=m_attributes, | |
MessageBody=body, | |
) | |
logging.info('Submitted message_id="%s" sequence="%s"', resp['MessageId'], resp['SequenceNumber']) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
from __future__ import print_function, unicode_literals | |
import logging | |
import re | |
from datetime import datetime | |
import boto3 | |
JOB_Q = '' # etc etc | |
CLOUDWATCH_NS = 'AtomQueueWorker' | |
DEFAULT_CONTAINER_IMAGE = 'ubuntu:18.04' | |
DEFAULT_JOB_SIZE = 'small' | |
JOB_SIZE_ATTRIB = 'Atom.JobSize' | |
"""The ``MessageAttribute`` name in which one will find a key from :ref:`TASKS_BY_JOB_SIZE`""" | |
TASK_OVERRIDE_ATTRIB = 'Atom.TaskArn' | |
"""The ``MessageAttribute`` name in which one may override the Task ARN""" | |
SCRIPTING_CONTAINER_NAME = 'scripting' | |
"""The name of the container in the Task to which we will submit scripts""" | |
# fill this in, perhaps even dynamically | |
ECS_CLUSTER_ID = '' | |
TASKS_BY_JOB_SIZE = { | |
'small': 'urn:alpha-beta:task1234', | |
'medium': 'urn:alpha-beta:task4567', | |
} | |
_cloudwatch_metric_template = { | |
'Namespace': CLOUDWATCH_NS, | |
'MetricData': [ | |
] | |
} | |
ecs = boto3.client('ecs') | |
sqs = boto3.client('sqs') | |
cloud_watch = boto3.client('cloudwatch') | |
def publish_cloudwatch(metric_name, value, **dimensions): | |
""" | |
:param str metric_name: | |
:param float value: | |
:param str dimensions: any metric dimensions to associate | |
""" | |
# -----^^^ evidently for kwargs, the `dict[str,` is implied, | |
# and we're only documenting the value-type | |
cwm = dict(_cloudwatch_metric_template) | |
m = { | |
'MetricName': metric_name, | |
'Timestamp': datetime.utcnow(), | |
'Value': value, | |
} | |
if dimensions: | |
m['Dimensions'] = [{'Name': k, 'Value': v} | |
for k, v in dimensions.items()] | |
cwm['MetricData'].append(m) | |
cloud_watch.put_metric_data(**cwm) | |
def on_message(m): | |
m_id = m['MessageId'] | |
# *attempt* to associate the RunTask with its MID | |
task_link = re.search( | |
'.{1,36}', re.sub(r'[^A-Za-z0-9_-]', '', m_id)).group(0) | |
r_handle = m['ReceiptHandle'] | |
#: :type: dict[str,dict] | |
ma = m['MessageAttributes'] | |
job_size = ma.get(JOB_SIZE_ATTRIB, DEFAULT_JOB_SIZE) | |
task_arn = ma.get(TASK_OVERRIDE_ATTRIB) | |
# TODO add env override support env_kv = ma.get('Atom.Environ', '') | |
if not task_arn: | |
task_arn = TASKS_BY_JOB_SIZE.get(job_size) | |
if not task_arn: | |
publish_cloudwatch('job_size_mapping_failure', 1, | |
job_size=job_size, ecs_cluster_id=ECS_CLUSTER_ID) | |
logging.error('Missing job_size mapping %s', job_size) | |
# don't ack, in case a separate worker _does_ have the updated mapping | |
# TODO re-publish with a fail marker incremented | |
return | |
script = m['Body'] | |
task_resp = ecs.run_task( | |
cluster=ECS_CLUSTER_ID, | |
taskDefinition=task_arn, | |
overrides={ | |
'name': SCRIPTING_CONTAINER_NAME, | |
'containerOverrides': [ | |
{ | |
'command': [ | |
'bash', | |
'-ec', | |
script, | |
] | |
} | |
] | |
}, | |
count=1, | |
startedBy=task_link, | |
launchType='FARGATE', | |
) | |
task_fails = task_resp['failures'] | |
for task_fail in task_fails: | |
publish_cloudwatch('job_run_task_fail', 1, | |
job_size=job_size, ecs_cluster_id=ECS_CLUSTER_ID) | |
logging.error('Bogus task submission; failed_arm="%s" reason="%s"', | |
task_fail['arn'], task_fail['reason']) | |
if task_fails: | |
return # don't ACK it, in hopes fail is temp | |
logging.debug('ACK message_id="%s"', m_id) | |
sqs.delete_message(QueueUrl=JOB_Q, ReceiptHandle=r_handle) | |
def main(): | |
while True: | |
resp = sqs.receive_message( | |
QueueUrl=JOB_Q, | |
AttributeNames=[ | |
'All', | |
], | |
MaxNumberOfMessages=1, | |
MessageAttributeNames=[ | |
'All', | |
], | |
VisibilityTimeout=5, | |
WaitTimeSeconds=0, | |
) | |
for m in resp['Messages']: | |
on_message(m) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment