Created
December 21, 2011 11:13
-
-
Save jedi4ever/1505663 to your computer and use it in GitHub Desktop.
extended nagios-zmq module (now does notifications too)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <time.h> | |
#include <json/json.h> | |
#include <pthread.h> | |
#include <uuid/uuid.h> | |
#include "include/zhelpers.h" | |
#include "include/nagios/objects.h" | |
#include "include/nagios/nagios.h" | |
#include "include/nagios/nebstructs.h" | |
#include "include/nagios/broker.h" | |
#include "include/nagios/nebmodules.h" | |
#include "include/nagios/nebcallbacks.h" | |
#define DEFAULT_ZMQ_IN_PORT 5555 | |
#define DEFAULT_ZMQ_OUT_PORT 6666 | |
#define LG_INFO 262144 | |
#define LG_WARN LOG_INFO | |
#define LG_ERR LOG_INFO | |
#define LG_CRIT LOG_INFO | |
#define LG_DEBUG LOG_INFO | |
#define LG_ALERT LOG_INFO | |
#define MAX_MESSAGE 1024*1024 | |
void *nagios_zmq_module_handle = NULL; | |
void *g_context; | |
void *g_publisher; | |
int g_zmq_in_port = DEFAULT_ZMQ_IN_PORT; | |
int g_zmq_out_port = DEFAULT_ZMQ_OUT_PORT; | |
pthread_t g_zmq_forwarder_thread; | |
pthread_t g_zmq_publisher_thread; | |
NEB_API_VERSION(CURRENT_NEB_API_VERSION) | |
void logger2(int priority, const char *loginfo, ...) | |
{ | |
char buffer[8192]; | |
snprintf(buffer, 20, "zmq queue: "); | |
va_list ap; | |
va_start(ap, loginfo); | |
vsnprintf(buffer + strlen(buffer), sizeof(buffer) - strlen(buffer), loginfo, ap); | |
va_end(ap); | |
write_to_all_logs(buffer, priority); | |
} | |
void parse_arguments(const char *args) { | |
char arguments[1024]; | |
char *arg_term; | |
// no arguments | |
if(!args) return; | |
strncpy(arguments, args, 1024); | |
arg_term = strtok(arguments, " ="); | |
while(arg_term != NULL) { | |
char *key, *value; | |
key = arg_term; | |
arg_term = strtok(NULL, " ="); | |
value = arg_term; | |
if(!strncmp(key, "inport", 4)) { | |
g_zmq_in_port = atoi(value); | |
} | |
if(!strncmp(key, "outport", 4)) { | |
g_zmq_out_port = atoi(value); | |
} | |
arg_term = strtok(NULL, " ="); | |
} | |
} | |
void *zmq_publisher_start() { | |
logger2(LG_INFO, "start zmq publisher."); | |
// init socket | |
g_publisher = zmq_socket(g_context, ZMQ_PUB); | |
zmq_connect(g_publisher, "tcp://localhost:5555"); | |
} | |
void *zmq_forwarder_start() { | |
logger2(LG_INFO, "start zmq forwarder."); | |
// Socket facing clients | |
void *incoming = zmq_socket(g_context, ZMQ_SUB); | |
zmq_bind(incoming, "tcp://*:5555"); | |
zmq_setsockopt(incoming, ZMQ_SUBSCRIBE, "", 0); | |
// Socket facing services | |
void *outgoing = zmq_socket(g_context, ZMQ_PUB); | |
zmq_bind(outgoing, "tcp://*:6666"); | |
// Start built-in device | |
zmq_device(ZMQ_FORWARDER, incoming, outgoing); | |
// We never get here… | |
zmq_close(incoming); | |
zmq_close(outgoing); | |
zmq_term(g_context); | |
} | |
void start_threads2() { | |
// init zmq context | |
g_context = zmq_init(1); | |
// start forwarder thread | |
pthread_create(&g_zmq_forwarder_thread, 0, zmq_forwarder_start, (void *)0); | |
pthread_detach(g_zmq_forwarder_thread); | |
// start publisher thread | |
pthread_create(&g_zmq_publisher_thread, 0, zmq_publisher_start, (void *)0); | |
pthread_detach(g_zmq_publisher_thread); | |
} | |
json_object * json_add_pair(json_object *jobj, char *key, char *value) { | |
json_object *jstring = json_object_new_string(value); | |
json_object_object_add(jobj, key, jstring); | |
return jobj; | |
} | |
/* Generate an UUID string. */ | |
char * create_uuid() { | |
char *string = malloc(37); | |
uuid_t uuid; | |
uuid_generate(uuid); | |
uuid_unparse(uuid, string); | |
return string; | |
} | |
int send_servicecheck2(nebstruct_service_check_data *check_data) { | |
time_t ts = time(NULL); | |
char message_buffer[MAX_MESSAGE]; | |
char cast_buffer[1024]; | |
char *uuid = create_uuid(); | |
json_object * jevent = json_object_new_object(); | |
sprintf(cast_buffer, "%s", uuid); | |
json_add_pair(jevent, "id", cast_buffer); | |
free(uuid); | |
json_add_pair(jevent, "context", "SERVICECHECK"); | |
json_add_pair(jevent, "source", "NAGIOS"); | |
sprintf(cast_buffer, "%i", (int)ts); | |
json_add_pair(jevent, "timestamp", cast_buffer); | |
json_object * jobj = json_object_new_object(); | |
sprintf(cast_buffer, "%i", check_data->current_attempt); | |
json_add_pair(jobj, "current_attempt", cast_buffer); | |
sprintf(cast_buffer, "%i", check_data->max_attempts); | |
json_add_pair(jobj, "max_attempts", cast_buffer); | |
sprintf(cast_buffer, "%i", check_data->state_type); | |
json_add_pair(jobj, "state_type", cast_buffer); | |
sprintf(cast_buffer, "%i", check_data->state); | |
json_add_pair(jobj, "state", cast_buffer); | |
sprintf(cast_buffer, "%ld", check_data->timestamp.tv_sec); | |
json_add_pair(jobj, "timestamp", cast_buffer); | |
sprintf(cast_buffer, "%f", check_data->execution_time); | |
json_add_pair(jobj, "execution_time", cast_buffer); | |
json_add_pair(jobj, "hostname", check_data->host_name); | |
json_add_pair(jobj, "service", check_data->service_description); | |
json_add_pair(jobj, "output", check_data->output); | |
if(check_data->perf_data) | |
json_add_pair(jobj, "performance", check_data->perf_data); | |
json_object_object_add(jevent, "payload", jobj); | |
logger2(LG_INFO, "'%s'\n", json_object_to_json_string(jevent)); | |
sprintf(message_buffer, "%s", json_object_to_json_string(jevent)); | |
s_send(g_publisher, message_buffer); | |
json_object_put(jevent); | |
json_object_put(jobj); | |
return 0; | |
} | |
int send_notification2(nebstruct_notification_data *notification_data) { | |
time_t ts = time(NULL); | |
char message_buffer[MAX_MESSAGE]; | |
char cast_buffer[1024]; | |
char *uuid = create_uuid(); | |
json_object * jevent = json_object_new_object(); | |
logger2(LG_INFO, "1"); | |
sprintf(cast_buffer, "%s", uuid); | |
json_add_pair(jevent, "id", cast_buffer); | |
free(uuid); | |
logger2(LG_INFO, "2"); | |
json_add_pair(jevent, "context", "NOTIFICATION"); | |
json_add_pair(jevent, "source", "NAGIOS"); | |
sprintf(cast_buffer, "%i", (int)ts); | |
json_add_pair(jevent, "timestamp", cast_buffer); | |
json_object * jobj = json_object_new_object(); | |
logger2(LG_INFO, "3"); | |
json_add_pair(jobj, "hostname", notification_data->host_name); | |
logger2(LG_INFO, "4"); | |
sprintf(cast_buffer, "%ld", notification_data->start_time.tv_sec); | |
json_add_pair(jobj, "start_time", cast_buffer); | |
sprintf(cast_buffer, "%ld", notification_data->end_time.tv_sec); | |
json_add_pair(jobj, "end_time", cast_buffer); | |
logger2(LG_INFO, "5"); | |
json_add_pair(jobj, "service_description", notification_data->service_description); | |
logger2(LG_INFO, "6"); | |
sprintf(cast_buffer, "%i", notification_data->reason_type); | |
json_add_pair(jobj, "reason_type", cast_buffer); | |
logger2(LG_INFO, "7"); | |
sprintf(cast_buffer, "%i", notification_data->state); | |
json_add_pair(jobj, "state", cast_buffer); | |
logger2(LG_INFO, "8"); | |
json_add_pair(jobj, "output", notification_data->output); | |
logger2(LG_INFO, "8"); | |
/* | |
json_add_pair(jobj, "ack_author", notification_data->ack_author); | |
logger2(LG_INFO, "8"); | |
json_add_pair(jobj, "ack_data", notification_data->ack_data); | |
logger2(LG_INFO, "8"); | |
sprintf(cast_buffer, "%i", notification_data->escalated); | |
json_add_pair(jobj, "escalated", cast_buffer); | |
logger2(LG_INFO, "8"); | |
sprintf(cast_buffer, "%i", notification_data->contacts_notified); | |
json_add_pair(jobj, "contacts_notified", cast_buffer); | |
logger2(LG_INFO, "8"); | |
*/ | |
json_object_object_add(jevent, "payload", jobj); | |
logger2(LG_INFO, "'%s'\n", json_object_to_json_string(jevent)); | |
sprintf(message_buffer, "%s", json_object_to_json_string(jevent)); | |
s_send(g_publisher, message_buffer); | |
json_object_put(jevent); | |
json_object_put(jobj); | |
return 0; | |
} | |
int send_hostcheck2(nebstruct_host_check_data *check_data) { | |
time_t ts = time(NULL); | |
char message_buffer[MAX_MESSAGE]; | |
char cast_buffer[1024]; | |
char *uuid = create_uuid(); | |
json_object * jevent = json_object_new_object(); | |
sprintf(cast_buffer, "%s", uuid); | |
json_add_pair(jevent, "id", cast_buffer); | |
free(uuid); | |
json_add_pair(jevent, "context", "HOSTCHECK"); | |
json_add_pair(jevent, "source", "NAGIOS"); | |
sprintf(cast_buffer, "%i", (int)ts); | |
json_add_pair(jevent, "timestamp", cast_buffer); | |
json_object * jobj = json_object_new_object(); | |
sprintf(cast_buffer, "%i", check_data->current_attempt); | |
json_add_pair(jobj, "current_attempt", cast_buffer); | |
sprintf(cast_buffer, "%i", check_data->max_attempts); | |
json_add_pair(jobj, "max_attempts", cast_buffer); | |
sprintf(cast_buffer, "%i", check_data->state_type); | |
json_add_pair(jobj, "state_type", cast_buffer); | |
sprintf(cast_buffer, "%i", check_data->state); | |
json_add_pair(jobj, "state", cast_buffer); | |
sprintf(cast_buffer, "%ld", check_data->timestamp.tv_sec); | |
json_add_pair(jobj, "timestamp", cast_buffer); | |
sprintf(cast_buffer, "%f", check_data->execution_time); | |
json_add_pair(jobj, "execution_time", cast_buffer); | |
json_add_pair(jobj, "hostname", check_data->host_name); | |
json_add_pair(jobj, "output", check_data->output); | |
if(check_data->perf_data) | |
json_add_pair(jobj, "performance", check_data->perf_data); | |
json_object_object_add(jevent, "payload", jobj); | |
logger2(LG_INFO, "'%s'\n", json_object_to_json_string(jevent)); | |
sprintf(message_buffer, "%s", json_object_to_json_string(jevent)); | |
s_send(g_publisher, message_buffer); | |
json_object_put(jevent); | |
json_object_put(jobj); | |
return 0; | |
} | |
int broker_check2(int event_type, void *data) { | |
logger2(LG_INFO, "broker check - zmq"); | |
if (event_type == NEBCALLBACK_SERVICE_CHECK_DATA) { | |
nebstruct_service_check_data *c = (nebstruct_service_check_data *)data; | |
if (c->type == NEBTYPE_SERVICECHECK_PROCESSED) { | |
logger2(LG_INFO, "we got service check - zmq"); | |
send_servicecheck2(c); | |
} | |
} else if (event_type == NEBCALLBACK_HOST_CHECK_DATA) { | |
nebstruct_host_check_data *c = (nebstruct_host_check_data *)data; | |
if (c->type == NEBTYPE_HOSTCHECK_PROCESSED) { | |
logger2(LG_INFO, "we got host check - zmq"); | |
send_hostcheck2(c); | |
} | |
} | |
return 0; | |
} | |
int broker_state2(int event_type __attribute__ ((__unused__)), void *data __attribute__ ((__unused__))) { | |
return 0; | |
} | |
int broker_process2(int event_type __attribute__ ((__unused__)), void *data) { | |
logger2(LG_INFO, "debug 1234"); | |
struct nebstruct_process_struct *ps = (struct nebstruct_process_struct *)data; | |
logger2(LG_INFO, "about to start threads if correct event"); | |
if (ps->type == NEBTYPE_PROCESS_EVENTLOOPSTART) { | |
logger2(LG_INFO, "starting threads"); | |
start_threads2(); | |
} | |
return 0; | |
} | |
int broker_notification2(int event_type __attribute__ ((__unused__)), void *data) { | |
logger2(LG_INFO, "got notification"); | |
nebstruct_notification_data *d = (nebstruct_notification_data *)data; | |
send_notification2(d); | |
return 0; | |
} | |
void register_callbacks2() { | |
neb_register_callback(NEBCALLBACK_STATE_CHANGE_DATA, nagios_zmq_module_handle, 2, broker_state2); | |
neb_register_callback(NEBCALLBACK_SERVICE_CHECK_DATA, nagios_zmq_module_handle, 2, broker_check2); | |
neb_register_callback(NEBCALLBACK_HOST_CHECK_DATA, nagios_zmq_module_handle, 2, broker_check2); | |
neb_register_callback(NEBCALLBACK_NOTIFICATION_DATA, nagios_zmq_module_handle, 2, broker_notification2); | |
// used for starting threads | |
neb_register_callback(NEBCALLBACK_PROCESS_DATA, nagios_zmq_module_handle, 2, broker_process2); | |
} | |
void deregister_callbacks2() { | |
neb_deregister_callback(NEBCALLBACK_STATE_CHANGE_DATA, broker_state2); | |
neb_deregister_callback(NEBCALLBACK_SERVICE_CHECK_DATA, broker_check2); | |
neb_deregister_callback(NEBCALLBACK_HOST_CHECK_DATA, broker_check2); | |
neb_deregister_callback(NEBCALLBACK_NOTIFICATION_DATA, broker_notification2); | |
neb_deregister_callback(NEBCALLBACK_PROCESS_DATA, broker_process2); | |
} | |
/* this function gets called when the module is loaded by the event broker */ | |
int nebmodule_init(int flags __attribute__ ((__unused__)), char *args, void *handle) { | |
nagios_zmq_module_handle = handle; | |
logger2(LG_INFO, "nagios-zmq by Marius Sturm"); | |
parse_arguments(args); | |
//register_callbacks(); | |
neb_register_callback(NEBCALLBACK_STATE_CHANGE_DATA, nagios_zmq_module_handle, 0, broker_state2); | |
neb_register_callback(NEBCALLBACK_SERVICE_CHECK_DATA, nagios_zmq_module_handle, 0, broker_check2); | |
neb_register_callback(NEBCALLBACK_HOST_CHECK_DATA, nagios_zmq_module_handle, 0, broker_check2); | |
neb_register_callback(NEBCALLBACK_NOTIFICATION_DATA, nagios_zmq_module_handle, 0, broker_notification2); | |
neb_register_callback(NEBCALLBACK_PROCESS_DATA, nagios_zmq_module_handle, 0, broker_process2); | |
logger2(LG_INFO, "successfully finished initialization"); | |
} | |
int nebmodule_deinit(int flags __attribute__ ((__unused__)), int reason __attribute__ ((__unused__))) { | |
logger2(LG_INFO, "deinitializing"); | |
deregister_callbacks2(); | |
//deinit zmq | |
zmq_close(g_publisher); | |
zmq_term(g_context); | |
return 0; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[1324465955] zmq queue: '{ "id": "8c06b0a5-4540-444d-b883-a309be562a7d", "context": "NOTIFICATION", "source": "NAGIOS", "timestamp": "1324465955", "payload": { "hostname": "localhost", "start_time": "1324465955", "end_time": "1324465955", "service_description": "Swap Usage", "reason_type": "0", "state": "2", "output": "SWAP CRITICAL - 100% free (0 MB out of 0 MB)" } }' | |
[1324465955] zmq queue: we got service check - zmq | |
[1324465955] zmq queue: '{ "id": "aecf8a27-3fe1-4b63-88cd-5ee7e40ab2af", "context": "SERVICECHECK", "source": "NAGIOS", "timestamp": "1324465955", "payload": { "current_attempt": "1", "max_attempts": "1", "state_type": "1", "state": "2", "timestamp": "1324465955", "execution_time": "0.012393", "hostname": "localhost" , "service": "Swap Usage", "output": "SWAP CRITICAL - 100% free (0 MB out of 0 MB)", "performance": "swap= 0MB;0;0;0;0" } }' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment