Skip to content

Instantly share code, notes, and snippets.

@netoisc
Created August 25, 2016 18:43
Show Gist options
  • Select an option

  • Save netoisc/e0742f607f66d006e27de33f8df7e042 to your computer and use it in GitHub Desktop.

Select an option

Save netoisc/e0742f607f66d006e27de33f8df7e042 to your computer and use it in GitHub Desktop.
A helper class that uses AWS ECS to launch tasks
import boto3
import logging
class ECS:
"""
Class to launch and manage notebook containers
"""
def __init__(self, app=None):
"""
:param app: flask app
"""
if app is not None:
self.init_app(app)
def init_app(self, app):
self.app = app
if not hasattr(self.app, 'extensions'):
self.app.extensions = {}
self.app.extensions['ecs'] = self
def launch_notebook(self, family):
client = boto3.client('ecs', **self.app.config.get('AWS_CREDENTIALS'))
response = client.run_task(
cluster=self.app.config.get('AWS_ECS_CLUSTER_NAME'),
taskDefinition=family,
count=1
)
# save response in S3 or DynamoDB ?
task_arn = None
container_instance_arn = None
if response:
for task in response['tasks']:
task_arn = task['taskArn']
container_instance_arn = task['containerInstanceArn']
break
# return task definition information after launch
return task_arn, container_instance_arn
def stop_task(self, task_arn):
client = boto3.client('ecs', **self.app.config.get('AWS_CREDENTIALS'))
client.stop_task(
cluster=self.app.config.get('AWS_ECS_CLUSTER_NAME'),
task=task_arn
)
return True
def register_task(self, family, **kwargs):
client = boto3.client('ecs', **self.app.config.get('AWS_CREDENTIALS'))
response = client.register_task_definition(
family=family,
containerDefinitions=[
{
"name": kwargs['name'],
"image": kwargs['image_name'],
"cpu": kwargs['cpu'],
"volumesFrom": [],
"memory": kwargs.get('memory', 350),
"extraHosts": [],
"dnsServers": [],
"disableNetworking": False,
"dnsSearchDomains": [],
"portMappings": [
{
"hostPort": 0,
"containerPort": kwargs.get('container_port', 0),
"protocol": "tcp"
}
],
"essential": True,
"entryPoint": [
"/bin/sh"
],
"mountPoints": [
{
"containerPath": kwargs.get('container_path') if kwargs.get('container_path') else '/home',
"sourceVolume": "user_vol",
"readOnly": False
}
],
"ulimits": [],
"dockerSecurityOptions": [],
"environment": kwargs.get('env_vars', []),
"links": [],
"workingDirectory": kwargs.get('work_dir') if kwargs.get('work_dir') else '/home',
"readonlyRootFilesystem": False,
"command": [
"-c",
"{}".format(kwargs['cmd'])
] if kwargs['cmd'] else [],
"dockerLabels": {},
"privileged": False
}
],
volumes=[
{
"host": {
"sourcePath": kwargs['volume_source_path']
},
"name": "user_vol"
}
]
)
return response
def describe_task_definition(self, family):
client = boto3.client('ecs', **self.app.config.get('AWS_CREDENTIALS'))
task = client.describe_task_definition(
taskDefinition=family
)
return task
def notebook_running(self, task_arn):
client = boto3.client('ecs', **self.app.config.get('AWS_CREDENTIALS'))
waiter = client.get_waiter('tasks_running')
try:
waiter.wait(
cluster=self.app.config.get('AWS_ECS_CLUSTER_NAME'),
tasks=[
task_arn,
])
except Exception as e:
logging.error('Error was occurred.%s' % e)
return False
else:
return True
def get_container_port(self, task_arn):
if task_arn:
client = boto3.client('ecs', **self.app.config.get('AWS_CREDENTIALS'))
response = client.describe_tasks(
cluster=self.app.config.get('AWS_ECS_CLUSTER_NAME'),
tasks=[
task_arn,
]
)
if response:
container_port = (response
['tasks'][0]
['containers'][0]
['networkBindings'][0]
['hostPort'])
return container_port
def get_ips_by_instance_arn(self, container_arn):
if container_arn:
client = boto3.client('ecs', **self.app.config.get('AWS_CREDENTIALS'))
response = client.describe_container_instances(
cluster=self.app.config.get('AWS_ECS_CLUSTER_NAME'),
containerInstances=[
container_arn,
]
)
if response:
instance_id = response['containerInstances'][0]['ec2InstanceId']
if instance_id:
client = boto3.client('ec2', **self.app.config.get('AWS_CREDENTIALS'))
response = client.describe_instances(
InstanceIds=[
instance_id,
],
)
if response:
public_ip = response['Reservations'][0]['Instances'][0]['PublicIpAddress']
private_ip = response['Reservations'][0]['Instances'][0]['PrivateIpAddress']
return public_ip, private_ip
def get_tasks_info(self, tasks):
client = boto3.client('ecs', **self.app.config.get('AWS_CREDENTIALS'))
return client.describe_tasks(
cluster=self.app.config.get('AWS_ECS_CLUSTER_NAME'),
tasks=tasks
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment