Last active
December 8, 2015 13:51
-
-
Save stunko/bbb5456766886ed17e30 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import uuid | |
import logging | |
from common.utils import mixutil | |
import locust | |
import gevent | |
import gevent.lock | |
import gevent.event | |
from kombu import Connection, Producer | |
from celery.result import AsyncResult as CeleryAsyncResult | |
OP_TIMEOUT = 30 | |
LOG = logging.getLogger('locust') | |
class FatmouseClient(object): | |
"""Client interface for Server and Agent API | |
""" | |
# Proxy to agent tasks to make things like | |
# fatmouse.agent(server_id).sys.set_hostname(hostname='example.com') | |
agent = None | |
# API tasks AMQP exchang | |
_api_exchange = 'api' | |
# API tasks AMQP routing key | |
_api_routing_key = 'api' | |
# API tasks message publish parameters | |
_publish_params = { | |
'content_type': 'application/json', | |
'content_encoding': 'UTF-8', | |
'mandatory': False | |
} | |
def __init__(self, broker_url, publish_params=None): | |
self.broker_url = broker_url | |
self._conn = None | |
if publish_params: | |
self._publish_params.update(publish_params) | |
def _get_connection(self): | |
if not self._conn: | |
self.conn = Connection(self.broker_url) | |
return self._conn | |
def _prepare_celery_task(self, task_name, *args, **kwargs): | |
""" | |
Generates Celery task-id and serializes task call into a message | |
return [taskId, messageBody] task-id and call message body | |
:type task_name: str | |
:param task_name: Celery task name | |
:param args, kwargs: optional Celery task parameters | |
""" | |
task_id = str(uuid.uuid4()) | |
task = { | |
'id': task_id, | |
'task': task_name, | |
'args': args, | |
'kwargs': kwargs | |
} | |
return [task_id, json.dumps(task)] | |
def create_async_result(self, task_id, result_class_name=None): | |
return AsyncResult(task_id) | |
def call_async(self, task_name, resultClassName=None, *args, **kwargs): | |
""" | |
Generic method to call Fatmouse API tasks. | |
:type task_name: str | |
:param task_name: Fatmouse API task name | |
:type resultClassName: str | |
:param resultClassNam: optional Task result wrapper class | |
:param args, kwargs: optional Task parameter | |
""" | |
(task_id, body) = self._prepare_celery_task(task_name, *args, **kwargs) | |
ch = self._get_connection().channel() | |
try: | |
producer = Producer( | |
ch, | |
exchange=self._api_exchange, | |
routing_key=self._api_routing_key, | |
auto_declare=True | |
) | |
producer.publish(body, **self._publish_params) | |
finally: | |
ch.close() | |
return self.create_async_result(task_id) | |
def consume_events_forever(self, callback): | |
pass | |
def ack_event(self, event_id): | |
pass | |
class AsyncResult(CeleryAsyncResult): | |
"""Asynchronous result of a Fatmouse task.""" | |
def __init__(self, task_id, app=None): | |
set.task_id = task_id | |
super(AsyncResult, self).__init__(task_id, app=app) | |
def get_task_id(self): | |
return self.task_id | |
class OrchestrationRule(dict): | |
def __init__(self): | |
dict.__init__(self) | |
self.script_name = str(uuid.uuid4()) + '.sh' | |
code = { | |
'script_name': self.script_name, | |
'scalr_event_server_id': '$SCALR_EVENT_SERVER_ID'} | |
code = json.dumps(code) | |
code = code.replace('"', '\\"') # Bash will echo valid JSON string | |
self.update({ | |
'target': {'type': 'farm'}, | |
'execute': { | |
'type': 'script', | |
'script_name': self.script_name, | |
'script_code': '#!/bin/bash\n echo "' + self.script_code + '"'}, | |
'blocking': True}) | |
class BootstrapTaskSet(object): | |
@classmethod | |
def setup_class(cls): | |
cls.docker = docker.Client() | |
cls.signals = mixutil.Observable('fatmouse.event') | |
# setup fatmouse client w/ event consumer | |
app = celery.Celery() | |
app.config_from_object(server.celeryfile) | |
cls.fatmouse = FatmouseClient(app) | |
def callback(event): | |
cls.events.fire('fatmouse.event', event) | |
cls.fatmouse.ack_event(event['event_id']) | |
gevent.spawn(cls.fatmouse.consume_events_forever, args=(callback, )) | |
# setup habibi farm | |
# TODO: setup farm | |
cls.habibi = habibi.api.HabibiApi() | |
cls.farm = None | |
cls.env_id = 1001 | |
setup_class.lock = gevent.lock.Lock() | |
setup_class.done = False | |
def setup(self): | |
# do scale up | |
self.server = self.habibi.create_server() | |
self.log = logging.getLogger('server:{}'.format(self.server['id'])) | |
def on_start(self): | |
# setup class once | |
s = self.setup_class | |
with s.lock: | |
if not s.done: | |
s() | |
s.done = True | |
# setup self | |
self.setup() | |
@locust.task | |
def bootstrap(self): | |
r = self.register_server() | |
self.launch_server(r) | |
self.dance_hello() | |
self.run_orchestration('HostInit') | |
self.run_init() | |
self.run_orchestration('BeforeHostUp') | |
self.run_orchestration('HostUp') | |
def register_server(self): | |
self.log.info('registering server') | |
try: | |
return self.fatmouse.register_server( | |
server_id=self.server['id'], | |
env_id=self.env_id).get() | |
except: | |
self.log.error('server registration failed!') | |
raise | |
def launch_server(self, r): | |
"""Launches agent in container. | |
:param r: An 'api.register_server' result data | |
:param r: dict | |
""" | |
got_auth = gevent.event.Event() | |
def listener(event): | |
if event['name'] == 'auth_agent' and \ | |
event['body']['server_id'] == r['server_id']: | |
got_auth.set() | |
self.signals.on('fatmouse.event', listener) | |
# actually launch container via habibi | |
env = r['agent_config'], r['celery_config'] | |
self.habibi.run_server( | |
server_id=r['server_id'], | |
env=env) | |
# wait for agent authentication | |
try: | |
got_auth.wait(OP_TIMEOUT) | |
except gevent.Timeout: | |
self.log.error('i turned gray waiting for "auth_agent" fatmouse event!') | |
raise | |
class FatmouseLocust(locust.Locust): | |
task_set = BootstrapTaskSet | |
min_wait = 5000 | |
max_wait = 15000 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment