Created
November 3, 2014 15:30
-
-
Save d--j/317b28a5fb14ac89227f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Manages Icinga Passive Check delivery | |
''' | |
import logging | |
import logging.handlers | |
import os | |
import signal | |
LOG_LEVELS = (logging.WARNING, logging.INFO, logging.DEBUG) | |
global LOG_LEVEL_INDEX | |
LOG_LEVEL_INDEX = 0 | |
# logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s') | |
logging.basicConfig(format="%(asctime)s [%(levelname)s]: %(threadName)s %(message)s") | |
log = logging.getLogger('icinga_daemon') | |
log.setLevel(LOG_LEVELS[LOG_LEVEL_INDEX]) | |
def switch_loglevel(signum, stack): | |
try: | |
global LOG_LEVEL_INDEX | |
current_log_index = LOG_LEVEL_INDEX | |
current_log_index += 1 | |
if current_log_index >= len(LOG_LEVELS): | |
current_log_index = 0 | |
LOG_LEVEL_INDEX = current_log_index | |
# print "new log level %d, %d" % (LOG_LEVEL_INDEX, LOG_LEVELS[LOG_LEVEL_INDEX]) | |
except Exception, e: | |
print "exception %s in signal handler for %d, stack = %s" % (e, signum, stack) | |
signal.signal(signal.SIGUSR1, switch_loglevel) | |
# log.setLevel(logging.DEBUG) | |
# syslog_handler = logging.handlers.SysLogHandler(address='/dev/log') | |
# syslog_handler.setFormatter(logging.Formatter(fmt="%s[%d]: %%(message)s" % ('icinga_daemon', os.getpid()))) | |
# syslog_handler.setLevel(logging.WARNING) | |
# log.addHandler(syslog_handler) | |
import sys | |
import time | |
import salt.config | |
import salt.client | |
import salt.runner | |
import salt.utils.event | |
import threading | |
import copy | |
import datetime | |
import heapq | |
import random | |
REPORT_INTERVAL = 30 # every 30 seconds | |
REFRESH_MINONS_INTERVAL = 60 * 10 # every 10 minutes | |
MINON_DISCOVERY_INTERVAL = 60 * 60 * 6 # every 6 hours | |
ICINGA_CONFIG_CREATOR_INITIAL_DELAY = 50 | |
MINION_CHECK_INITIAL_DELAY_LOW = 75 | |
MINION_CHECK_INITIAL_DELAY_HIGH = 240 | |
DISCOVERY_IDENTIFIER = '__discovery__' | |
ICINGA_HOST = 'your.icinga.minion.id' | |
def get_interval(check_dict): | |
seconds = 0 | |
seconds += int(check_dict.get('seconds', 0)) | |
seconds += int(check_dict.get('minutes', 0)) * 60 | |
seconds += int(check_dict.get('hours', 0)) * 3600 | |
seconds += int(check_dict.get('days', 0)) * 86400 | |
if seconds == 0: | |
seconds = 600 # 10 Minutes is default | |
if seconds < REPORT_INTERVAL: | |
seconds = REPORT_INTERVAL # < REPORT_INTERVAL seconds interval doesn't makes sense | |
return seconds | |
class Task: | |
"""One repeating thing that can be scheduled""" | |
def __init__(self, identifier, interval, start_delay=0.0): | |
self.identifier = identifier | |
self.interval = interval | |
self.next_execution = time.time() + start_delay | |
self.enabled = False | |
def __repr__(self): | |
return '<Task {0} interval={1}>'.format(self.identifier, self.interval) | |
def log_info(self, other_tasks=[]): | |
return "" | |
def get_first_sort_key(self): | |
return int(self.next_execution) / 5 * 5 # 5 seconds accuracy | |
def get_secondary_sort_key(self): | |
return '' | |
def enable(self): | |
self.enabled = True | |
def disable(self): | |
self.enabled = False | |
def needs_rescheduleing(self, now=False, forced_reshedule_time=None): | |
if self.enabled: | |
if forced_reshedule_time: | |
self.next_execution = forced_reshedule_time | |
elif now: | |
self.next_execution = time.time() | |
else: | |
self.next_execution = time.time() + self.interval | |
return True | |
else: | |
return False | |
def perform(self, adapter): | |
pass | |
def combinable_with(self, other_tasks): | |
return False | |
def perform_combined(self, adapter, other_tasks): | |
log.warn('Try to perform task {0} combined with {1} other tasks. But this task is not combinable!'.format(self, len(other_tasks))) | |
self.perform(adapter) | |
for t in other_tasks: | |
t.perform(adapter) | |
class Debugger(Task): | |
"""Outputs internal data""" | |
def __init__(self): | |
Task.__init__(self, 'debugger', REPORT_INTERVAL * 10) | |
def log_info(self, other_tasks=[]): | |
return '' | |
def perform(self, adapter): | |
def copy_queue(shared_state): | |
qlist = copy.copy(shared_state.heap) | |
return qlist | |
queue_as_list = sorted(adapter.shared_state.do_locked(copy_queue)) | |
log.debug(Debugger.format_queue(queue_as_list)) | |
@staticmethod | |
def format_queue(queue_as_list): | |
ret = '' | |
for i in queue_as_list: | |
ret += "{0} | {1:<30} | {2}\n".format(datetime.datetime.fromtimestamp(i[0]).strftime('%Y-%m-%d %H:%M:%S'), i[1], i[2].identifier) | |
return ret | |
class Reporter(Task): | |
"""Reports checks to Icinga""" | |
results = [] | |
results_lock = threading.RLock() | |
def __init__(self): | |
Task.__init__(self, 'reporter', REPORT_INTERVAL) | |
def log_info(self, other_tasks=[]): | |
return 'lines={0}'.format(len(Reporter.results)) | |
def perform(self, adapter): | |
with Reporter.results_lock: | |
lines = Reporter.results | |
Reporter.results = [] | |
if len(lines) > 0: | |
# result = self.adapter.client.cmd((ICINGA_HOST,), 'file.append', ['/var/lib/icinga/rw/icinga.cmd'] + lines, expr_form='list') | |
result = adapter.client.cmd((ICINGA_HOST,), 'cmd.run', ['cat >> /var/lib/icinga/rw/icinga.cmd'], kwarg=dict(stdin="\n".join(lines)), expr_form='list') | |
if ICINGA_HOST not in result: | |
log.error("Reporter: Icinga minon down? {0}".format(result)) | |
with Reporter.results_lock: | |
Reporter.results.extend(lines) | |
# TODO: only reappend the last X lines to not trash the memory when the monitor host if offline for a long time | |
@staticmethod | |
def add_result(line): | |
with Reporter.results_lock: | |
Reporter.results.append(line) | |
class RefreshMinions(Task): | |
"""Refreshes available Minions and create MinionDiscovery tasks for new Minions""" | |
def __init__(self): | |
Task.__init__(self, 'refresh_minions', REFRESH_MINONS_INTERVAL) | |
def perform(self, adapter): | |
status = adapter.get_status_of_minions() | |
if status['down']: | |
for minion in sorted(status['down']): | |
if adapter.knows_minion(minion): | |
log.error("Minion {0} is down (was up)".format(minion)) | |
adapter.mark_minion_down(minion) | |
else: | |
log.error("Minion {0} is down (never seen up before)".format(minion)) | |
if status['up']: | |
for minion in sorted(status['up']): | |
if not adapter.knows_minion(minion): | |
log.warning(" Minion {0} is up".format(minion)) | |
minion_task = MinionDiscovery(minion) | |
adapter.add_task(minion_task) | |
adapter.mark_minion_up(minion) | |
class MinionTask(Task): | |
"""A task for a specific minion""" | |
def __init__(self, identifier, interval, minion, start_delay=0.0): | |
Task.__init__(self, identifier, interval, start_delay) | |
self.minion = minion | |
class MinionDiscovery(MinionTask): | |
"""Discovers passive checks on a single minion""" | |
def __init__(self, minion): | |
MinionTask.__init__(self, '%s::%s' % (minion, DISCOVERY_IDENTIFIER), MINON_DISCOVERY_INTERVAL, minion) | |
def log_info(self, other_tasks=[]): | |
return self.minion | |
def perform(self, adapter): | |
checks = adapter.get_config_of_minion(self.minion) | |
old_checks = adapter.get_minion_checks(self.minion) | |
if checks: | |
check_names = [c[0] for c in checks] | |
checks = dict(checks) | |
remaining_checks = list(set(old_checks) & set(check_names)) | |
removed_checks = list(set(old_checks) - set(check_names)) | |
new_checks = list(set(check_names) - set(old_checks)) | |
log.debug('Discovery result for {0}: new_checks = {1}, removed_checks = {2}, remaining_checks = {3}'.format(self.minion, new_checks, removed_checks, remaining_checks)) | |
adapter.update_minion_checks(self.minion, new_checks, removed_checks, checks) | |
else: | |
log.warn('Discovery task of {0} disables itself because of communication error (minion down?)'.format(self.minion)) | |
self.disable() | |
class MinionCheck(MinionTask): | |
"""MinionCheck""" | |
def __init__(self, minion, check_name, check): | |
MinionTask.__init__(self, '%s::%s' % (minion, check_name), get_interval(check), minion, 90.0) | |
if 'retry_interval' in check: | |
if isinstance(check['retry_interval'], dict): | |
self.retry_interval = get_interval(check['retry_interval']) | |
else: | |
self.retry_interval = check['retry_interval'] | |
else: | |
self.retry_interval = self.interval | |
self.success_interval = self.interval | |
self.check_name = check_name | |
# self.check = check | |
def get_secondary_sort_key(self): | |
return self.minion | |
def set_next_interval(self, ret): | |
if 'retcode' in ret: | |
if ret['retcode'] == 0 or ret['retcode'] == 99: | |
self.interval = self.success_interval | |
else: | |
self.interval = self.retry_interval | |
def perform(self, adapter): | |
ret = adapter.execute_passive_checks_on_minion(self.minion, [self.check_name]) | |
self.set_next_interval(ret.get(self.check_name, {})) | |
def combinable_with(self, other_tasks): | |
for other_task in other_tasks: | |
if not isinstance(other_task, MinionCheck) or other_task.minion != self.minion: | |
return False | |
return True | |
def perform_combined(self, adapter, other_tasks): | |
check_names = [self.check_name] | |
for other_task in other_tasks: | |
check_names.append(other_task.check_name) | |
ret = adapter.execute_passive_checks_on_minion(self.minion, check_names) | |
self.set_next_interval(ret.get(self.check_name, {})) | |
for other_task in other_tasks: | |
other_task.set_next_interval(ret.get(other_task.check_name, {})) | |
def log_info(self, other_tasks=[]): | |
checks = sorted([t.check_name for t in [self] + other_tasks]) | |
return '{0:<34} -> [{2:0>2}] {1}'.format(self.minion, ', '.join(checks), len(checks) ) | |
class IcingaConfigCreator(Task): | |
"""Creates Icinga configuration files""" | |
def __init__(self): | |
Task.__init__(self, 'create_icinga_config', 100 ) | |
def perform(self, adapter): | |
self.disable() | |
adapter.remove_task(self) | |
adapter.create_icinga_config() | |
class SharedState: | |
"""Shared state of all workers""" | |
def __init__(self): | |
self.lock = threading.RLock() | |
self.heap = [] | |
self.tasks = {} | |
self.minion_checks = {} | |
self.minion_spread = {} | |
refresh_task = RefreshMinions() | |
self.add_task(refresh_task) | |
reporter_task = Reporter() | |
self.add_task(reporter_task) | |
self.add_task(Debugger()) | |
def get_minion_spread(self, minion): | |
with self.lock: | |
if minion not in self.minion_spread: | |
self.minion_spread[minion] = random.randrange(MINION_CHECK_INITIAL_DELAY_LOW, MINION_CHECK_INITIAL_DELAY_HIGH) | |
return self.minion_spread[minion] | |
def has_task_identifier(self, task_identifier): | |
with self.lock: | |
return self.tasks.get(task_identifier, False) and True | |
def has_task_like(self, task, only_enabled = False): | |
with self.lock: | |
has_task = self.tasks.get(task.identifier, False) and True | |
if only_enabled: | |
has_task = has_task and self.tasks[task.identifier].enabled | |
return has_task | |
def is_enabled(self, task_identifier): | |
with self.lock: | |
return self.tasks.get(task_identifier, False) and self.tasks[task_identifier].enabled | |
def add_task(self, task, only_once = False, forced_reshedule_time = None): | |
with self.lock: | |
if only_once and self.has_task_like(task, only_enabled=True): | |
return False | |
task.enable() | |
log.debug('Add Task {0}'.format(task.identifier)) | |
if self.has_task_like(task): | |
log.debug(' already have a task like this, remove the old one') | |
self.remove_task(self.tasks[task.identifier]) | |
self.tasks[task.identifier] = task | |
if isinstance(task, MinionCheck): | |
if not self.minion_checks.get(task.minion, False): | |
self.minion_checks[task.minion] = [ task.check_name ] | |
else: | |
self.minion_checks[task.minion].append(task.check_name) | |
if forced_reshedule_time: | |
self.reschedule_task(task, forced_reshedule_time=forced_reshedule_time) | |
else: | |
self.reschedule_task(task, now=True) | |
return True | |
def remove_task(self, task): | |
with self.lock: | |
log.debug('Remove Task {0}'.format(task.identifier)) | |
task.disable() | |
if self.tasks.get(task.identifier, False): | |
del self.tasks[task.identifier] | |
if isinstance(task, MinionCheck) and self.minion_checks.get(task.minion, False): | |
self.minion_checks[task.minion].remove(task.check_name) | |
self._remove_from_heap(task.identifier) | |
def remove_task_by_identifier(self, task_identifier): | |
with self.lock: | |
log.debug('Remove Task {0}'.format(task_identifier)) | |
if self.tasks.get(task_identifier, False): | |
task = self.tasks[task.identifier] | |
task.disable() | |
if isinstance(task, MinionCheck) and self.minion_checks.get(task.minion, False): | |
self.minion_checks[task.minion].remove(task.check_name) | |
del self.tasks[task.identifier] | |
self._remove_from_heap(task.identifier) | |
def _remove_from_heap(self, task_identifier): | |
with self.lock: | |
self.heap[:] = [x for x in self.heap if not x[2].identifier == task_identifier] | |
heapq.heapify(self.heap) | |
def _queue_task(self, task): | |
with self.lock: | |
heapq.heappush(self.heap, [task.get_first_sort_key(), task.get_secondary_sort_key(), task,]) | |
def reschedule_task(self, task, now=False, forced_reshedule_time = None): | |
if task.needs_rescheduleing(now=now, forced_reshedule_time = forced_reshedule_time): | |
self._queue_task(task) | |
def _get_smallest_enabled_queue_item(self): | |
with self.lock: | |
try: | |
item = heapq.heappop(self.heap) | |
while not item[2].enabled: | |
item = heapq.heappop(self.heap) | |
return item | |
except IndexError, e: | |
return (None, None, None) | |
def _get_next_scheduled_task(self): | |
check_time, secondary_sort_key, task = self._get_smallest_enabled_queue_item() | |
if check_time: | |
now = time.time() | |
if check_time <= now: | |
return task | |
else: | |
with self.lock: | |
heapq.heappush(self.heap, [check_time, secondary_sort_key, task,]) | |
return None | |
else: | |
return None | |
def _get_combinable_tasks(self, task): | |
other_tasks = [] | |
last_poped_item = self._get_smallest_enabled_queue_item() | |
while last_poped_item[2]: | |
if task.combinable_with(other_tasks + [last_poped_item[2]]): | |
other_tasks.append(last_poped_item[2]) | |
else: | |
with self.lock: | |
heapq.heappush(self.heap, last_poped_item) | |
return other_tasks | |
last_poped_item = self._get_smallest_enabled_queue_item() | |
return other_tasks | |
def do_locked(self, f): | |
with self.lock: | |
return f(self) | |
def get_minion_checks(self, minion): | |
with self.lock: | |
return self.minion_checks.get(minion, []) | |
class SaltAdapter: | |
"""Encapsulates all salt stuff""" | |
def __init__(self, shared_state, client, caller, runner): | |
self.shared_state = shared_state | |
self.client = client | |
self.caller = caller | |
self.runner = runner | |
def has_task_identifier(self, task_identifier): | |
return self.shared_state.has_task_identifier(task_identifier) | |
def has_task_like(self, task, **args): | |
return self.shared_state.has_task_like(task, **args) | |
def is_enabled(self, task_identifier): | |
return self.shared_state.is_enabled(task_identifier) | |
def add_task(self, task, **args): | |
return self.shared_state.add_task(task, **args) | |
def remove_task(self, task): | |
return self.shared_state.remove_task(task) | |
def remove_task_by_identifier(self, task_identifier): | |
return self.shared_state.remove_task_by_identifier(task_identifier) | |
def reschedule_task(self, task, now=False): | |
return self.shared_state.reschedule_task(task, now) | |
def get_minion_checks(self, minion): | |
return self.shared_state.get_minion_checks(minion) | |
def get_status_of_minions(self): | |
return self.runner.cmd('manage.status', [False]) | |
def get_config_of_minion(self, minion): | |
minion_dict = self.client.cmd((minion,), ['nagios_plugin.get_config', 'nagios_plugin.all_runable'], [[],[]], expr_form='list', timeout=25) | |
if minion not in minion_dict: | |
log.error("could not get config for {0}".format(minion)) | |
return None | |
monitor_checks = minion_dict[minion]['nagios_plugin.get_config'] | |
runnable_checks = minion_dict[minion]['nagios_plugin.all_runable'] | |
if isinstance(runnable_checks, basestring): | |
log.error("Wrong runnable checks from {0}: {1}".format(minion, runnable_checks)) | |
return None | |
else: | |
checks = [(check_name, monitor_checks[check_name],) for check_name in runnable_checks] | |
log.debug("Minion %s got checks: %s", minion, [check_name for check_name in runnable_checks]) | |
return checks | |
def create_icinga_config(self): | |
return self.runner.cmd('icinga.create_config', []) | |
def schedule_icinga_reconfiguration(self): | |
create_config_task = IcingaConfigCreator() | |
if not self.add_task(create_config_task, only_once = True, forced_reshedule_time = time.time() + ICINGA_CONFIG_CREATOR_INITIAL_DELAY): | |
log.debug("Already have an IcingaConfigCreator task") | |
def execute_passive_checks_on_minion(self, minion, check_names): | |
results = self.client.cmd((minion,), 'nagios_plugin.run_configs', check_names, expr_form='list', timeout= min(30, 10 * len(check_names))) | |
ret = {} | |
if minion in results and isinstance(results[minion], list): | |
results = results[minion] | |
for result in results: | |
if isinstance(result, dict): | |
ret[result['service']] = result | |
if result.get('action') == 'check': | |
Reporter.add_result(result['result']) | |
else: | |
log.error("Minion {0} had an error executing nagios check {1}: {2}".format(minion, result.get('service', '<unknown>'), result)) | |
else: | |
log.error("Minion {0} had an error executing a nagios check ({2}) {1}".format(minion, result, check_names)) | |
else: | |
log.error("Minion {0} had an error executing nagios checks ({2}): {1}".format(minion, results, check_names)) | |
return ret | |
def perform_next_task(self): | |
def f(shared_state): | |
task = shared_state._get_next_scheduled_task() | |
if task: | |
other_tasks = shared_state._get_combinable_tasks(task) | |
return task, other_tasks | |
else: | |
return None, () | |
task, other_tasks = self.shared_state.do_locked(f) | |
if task: | |
start = time.time() | |
start_delta = float(start - task.get_first_sort_key()) | |
log.info( | |
"> {cls:<16} {log_info}".format( | |
cls = task.__class__.__name__, | |
log_info = task.log_info(other_tasks), | |
) | |
) | |
try: | |
if len(other_tasks) > 0: | |
task.perform_combined(self, other_tasks) | |
else: | |
task.perform(self) | |
except Exception, e: | |
log.exception("Caught exception while performing task {0}".format(task)) | |
finally: | |
end = time.time() | |
log.info( | |
"< {cls:<16} executed in {time: >+9.4}s, start delta {delta: >+9.4}s".format( | |
cls = task.__class__.__name__, | |
time = float(end - start), | |
delta = start_delta, | |
) | |
) | |
self.reschedule_task(task) | |
for other_task in other_tasks: | |
self.reschedule_task(other_task) | |
else: | |
# log.debug("No task to execute (queue size = {0})".format(self.queue.qsize())) | |
time.sleep(0.35) # don't burn the CPU | |
def knows_minion(self, minion): | |
discovery_identifier = '%s::%s' % (minion, DISCOVERY_IDENTIFIER) | |
return self.has_task_identifier(discovery_identifier) | |
def mark_minion_down(self, minion): | |
discovery_identifier = '%s::%s' % (minion, DISCOVERY_IDENTIFIER) | |
def f(shared_state): | |
if shared_state.tasks.get(discovery_identifier, False): | |
shared_state.tasks[discovery_identifier].disable() | |
minion_checks = shared_state.minion_checks.get(minion, [])[:] | |
for check_name in minion_checks: | |
check_identifier = '%s::%s' % (minion, check_name) | |
if shared_state.tasks.get(check_identifier, False): | |
check_task = shared_state.tasks[check_identifier] | |
shared_state.remove_task(check_task) | |
self.shared_state.do_locked(f) | |
def mark_minion_up(self, minion): | |
discovery_identifier = '%s::%s' % (minion, DISCOVERY_IDENTIFIER) | |
def f(shared_state): | |
if shared_state.tasks.get(discovery_identifier, False): | |
discovery_task = shared_state.tasks[discovery_identifier] | |
if not discovery_task.enabled: | |
discovery_task.enable() | |
shared_state.reschedule_task(discovery_task, now=True) | |
self.shared_state.do_locked(f) | |
def update_minion_checks(self, minion, new_checks, removed_checks, checks): | |
def f(shared_state): | |
check_time = time.time() + shared_state.get_minion_spread(minion) | |
if len(new_checks) > 0 or len(removed_checks) > 0: | |
self.schedule_icinga_reconfiguration() | |
for check_name in new_checks: | |
check_task = MinionCheck(minion, check_name, checks[check_name]) | |
shared_state.add_task(check_task, forced_reshedule_time = check_time) | |
for check_name in removed_checks: | |
shared_state.remove_task_by_identifier('%s::%s' % (minion, check_name)) | |
self.shared_state.do_locked(f) | |
def perform_work(shared_state): | |
client = salt.client.LocalClient() | |
caller = salt.client.Caller() | |
opts = salt.config.master_config('/etc/salt/master') | |
runner = salt.runner.RunnerClient(opts) | |
adapter = SaltAdapter(shared_state, client, caller, runner) | |
while True: | |
adapter.perform_next_task() | |
def optimizer(shared_state): | |
"""Reorders the tasks so that they are combinable again""" | |
REORDER_WINDOW = 5 * 60 # 5 minutes | |
def optimize(shared_state): | |
start = time.time() | |
number_of_changes = 0 | |
map_of_secondary_sort_keys = {} | |
sorted_array = sorted(shared_state.heap) | |
log.info("> Run Optimizer") | |
log.debug(Debugger.format_queue(sorted_array)) | |
for item in sorted_array: | |
if not item[1] in map_of_secondary_sort_keys: | |
map_of_secondary_sort_keys[item[1]] = [] | |
map_of_secondary_sort_keys[item[1]].append(item) | |
for secondary_sort_key in map_of_secondary_sort_keys: | |
a = sorted(map_of_secondary_sort_keys[secondary_sort_key]) | |
current_timestamp = None | |
for item in a: | |
if item[1]: | |
if not current_timestamp: | |
current_timestamp = item[0] | |
if abs(item[0] - current_timestamp) < REORDER_WINDOW: | |
if current_timestamp != item[0]: | |
item[0] = current_timestamp | |
number_of_changes += 1 | |
else: | |
current_timestamp = item[0] | |
heapq.heapify(sorted_array) | |
shared_state.heap = sorted_array | |
copy_of_sorted_array = sorted(sorted_array) | |
end = time.time() | |
log.info("< run time {0: >+9.4}s, did {1} number of changes".format(float(end - start), number_of_changes)) | |
log.debug(Debugger.format_queue(copy_of_sorted_array)) | |
while True: | |
time.sleep(REORDER_WINDOW) | |
shared_state.do_locked(optimize) | |
time.sleep(60 * 60 - REORDER_WINDOW) # repeat every hour | |
def reactor(shared_state): | |
"""This makes deadlocks - do not instantiate""" | |
while True: | |
try: | |
time.sleep(5) | |
event = salt.utils.event.MasterEvent('/var/run/salt/master') | |
data = event.get_event(tag='salt/presence/change') | |
if data: | |
log.error("reactor got notified about presence change, start refresh minions run {0}".format(data)) | |
shared_state.add_task(RefreshMinions()) | |
except Exception, e: | |
log.error("reactor error - is the master running? {0}".format(e)) | |
def main(): | |
log.warning("Start Icinga Passive Daemon") | |
shared_state = SharedState() | |
processes = [] | |
# processes.append(threading.Thread(target=reactor, name="Reactor", args=(shared_state,))) | |
for x in xrange(1, 5): | |
processes.append(threading.Thread(target=perform_work, name="Worker%s"%x, args=(shared_state,))) | |
processes.append(threading.Thread(target=optimizer, name="Optimzer", args=(shared_state,))) | |
for p in processes: | |
p.start() | |
try: | |
current_log_index = LOG_LEVEL_INDEX | |
while True: | |
time.sleep(5) | |
if current_log_index != LOG_LEVEL_INDEX: | |
current_log_index = LOG_LEVEL_INDEX | |
log.setLevel(LOG_LEVELS[current_log_index]) | |
log.warning( | |
"new log index is %d, log level is %s (%d) (%s)", | |
current_log_index, | |
logging.getLevelName(LOG_LEVELS[current_log_index]), | |
LOG_LEVELS[current_log_index], | |
LOG_LEVELS | |
) | |
except KeyboardInterrupt, SystemExit: | |
# TODO: save last_successful_index in a file | |
log.warning("Exit Icinga Passive Daemon") | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment