Skip to content

Instantly share code, notes, and snippets.

@stunko
Last active December 8, 2015 13:51
Show Gist options
  • Save stunko/bbb5456766886ed17e30 to your computer and use it in GitHub Desktop.
Save stunko/bbb5456766886ed17e30 to your computer and use it in GitHub Desktop.
import json
import uuid
import logging
from common.utils import mixutil
import locust
import gevent
import gevent.lock
import gevent.event
from kombu import Connection, Producer
from celery.result import AsyncResult as CeleryAsyncResult
OP_TIMEOUT = 30
LOG = logging.getLogger('locust')
class FatmouseClient(object):
"""Client interface for Server and Agent API
"""
# Proxy to agent tasks to make things like
# fatmouse.agent(server_id).sys.set_hostname(hostname='example.com')
agent = None
# API tasks AMQP exchang
_api_exchange = 'api'
# API tasks AMQP routing key
_api_routing_key = 'api'
# API tasks message publish parameters
_publish_params = {
'content_type': 'application/json',
'content_encoding': 'UTF-8',
'mandatory': False
}
def __init__(self, broker_url, publish_params=None):
self.broker_url = broker_url
self._conn = None
if publish_params:
self._publish_params.update(publish_params)
def _get_connection(self):
if not self._conn:
self.conn = Connection(self.broker_url)
return self._conn
def _prepare_celery_task(self, task_name, *args, **kwargs):
"""
Generates Celery task-id and serializes task call into a message
return [taskId, messageBody] task-id and call message body
:type task_name: str
:param task_name: Celery task name
:param args, kwargs: optional Celery task parameters
"""
task_id = str(uuid.uuid4())
task = {
'id': task_id,
'task': task_name,
'args': args,
'kwargs': kwargs
}
return [task_id, json.dumps(task)]
def create_async_result(self, task_id, result_class_name=None):
return AsyncResult(task_id)
def call_async(self, task_name, resultClassName=None, *args, **kwargs):
"""
Generic method to call Fatmouse API tasks.
:type task_name: str
:param task_name: Fatmouse API task name
:type resultClassName: str
:param resultClassNam: optional Task result wrapper class
:param args, kwargs: optional Task parameter
"""
(task_id, body) = self._prepare_celery_task(task_name, *args, **kwargs)
ch = self._get_connection().channel()
try:
producer = Producer(
ch,
exchange=self._api_exchange,
routing_key=self._api_routing_key,
auto_declare=True
)
producer.publish(body, **self._publish_params)
finally:
ch.close()
return self.create_async_result(task_id)
def consume_events_forever(self, callback):
pass
def ack_event(self, event_id):
pass
class AsyncResult(CeleryAsyncResult):
"""Asynchronous result of a Fatmouse task."""
def __init__(self, task_id, app=None):
set.task_id = task_id
super(AsyncResult, self).__init__(task_id, app=app)
def get_task_id(self):
return self.task_id
class OrchestrationRule(dict):
def __init__(self):
dict.__init__(self)
self.script_name = str(uuid.uuid4()) + '.sh'
code = {
'script_name': self.script_name,
'scalr_event_server_id': '$SCALR_EVENT_SERVER_ID'}
code = json.dumps(code)
code = code.replace('"', '\\"') # Bash will echo valid JSON string
self.update({
'target': {'type': 'farm'},
'execute': {
'type': 'script',
'script_name': self.script_name,
'script_code': '#!/bin/bash\n echo "' + self.script_code + '"'},
'blocking': True})
class BootstrapTaskSet(object):
@classmethod
def setup_class(cls):
cls.docker = docker.Client()
cls.signals = mixutil.Observable('fatmouse.event')
# setup fatmouse client w/ event consumer
app = celery.Celery()
app.config_from_object(server.celeryfile)
cls.fatmouse = FatmouseClient(app)
def callback(event):
cls.events.fire('fatmouse.event', event)
cls.fatmouse.ack_event(event['event_id'])
gevent.spawn(cls.fatmouse.consume_events_forever, args=(callback, ))
# setup habibi farm
# TODO: setup farm
cls.habibi = habibi.api.HabibiApi()
cls.farm = None
cls.env_id = 1001
setup_class.lock = gevent.lock.Lock()
setup_class.done = False
def setup(self):
# do scale up
self.server = self.habibi.create_server()
self.log = logging.getLogger('server:{}'.format(self.server['id']))
def on_start(self):
# setup class once
s = self.setup_class
with s.lock:
if not s.done:
s()
s.done = True
# setup self
self.setup()
@locust.task
def bootstrap(self):
r = self.register_server()
self.launch_server(r)
self.dance_hello()
self.run_orchestration('HostInit')
self.run_init()
self.run_orchestration('BeforeHostUp')
self.run_orchestration('HostUp')
def register_server(self):
self.log.info('registering server')
try:
return self.fatmouse.register_server(
server_id=self.server['id'],
env_id=self.env_id).get()
except:
self.log.error('server registration failed!')
raise
def launch_server(self, r):
"""Launches agent in container.
:param r: An 'api.register_server' result data
:param r: dict
"""
got_auth = gevent.event.Event()
def listener(event):
if event['name'] == 'auth_agent' and \
event['body']['server_id'] == r['server_id']:
got_auth.set()
self.signals.on('fatmouse.event', listener)
# actually launch container via habibi
env = r['agent_config'], r['celery_config']
self.habibi.run_server(
server_id=r['server_id'],
env=env)
# wait for agent authentication
try:
got_auth.wait(OP_TIMEOUT)
except gevent.Timeout:
self.log.error('i turned gray waiting for "auth_agent" fatmouse event!')
raise
class FatmouseLocust(locust.Locust):
task_set = BootstrapTaskSet
min_wait = 5000
max_wait = 15000
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment