Last active
November 24, 2016 12:35
-
-
Save claws/7188924 to your computer and use it in GitHub Desktop.
A candidate CZMQ implementation for a ZMQ socket event monitor.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Monitor socket transport events (tcp and ipc only) | |
#include "../include/czmq.h" | |
// remove this when zsockmon.h is properly moved into ../include | |
// and added to ../include/czmq.h (via adding to model/project.xml) | |
#include "zsockmon.h" | |
// Structure of our class | |
struct _zsockmon_t { | |
zctx_t *ctx; // Private 0MQ context | |
void *pipe; // Pipe through to backend agent | |
}; | |
// Background task does the real I/O | |
static void | |
s_agent_task (void *args, zctx_t *ctx, void *pipe); | |
// -------------------------------------------------------------------------- | |
// Create a new socket monitor | |
zsockmon_t * | |
zsockmon_new (zctx_t *ctx, void *socket, int events) | |
{ | |
zsockmon_t *self = (zsockmon_t *) zmalloc (sizeof (zsockmon_t)); | |
// register a monitor endpoint on the socket | |
char *monitor_endpoint = (char *) zmalloc (100); | |
memset(monitor_endpoint, 0, 100); | |
sprintf (monitor_endpoint, "inproc://zsockmon-%p", socket); | |
int rc = zmq_socket_monitor (socket, monitor_endpoint, events); | |
assert (rc == 0); | |
// Start background agent to connect to the inproc monitor socket | |
self->pipe = zthread_fork (ctx, s_agent_task, NULL); | |
// Configure backend agent with monitor endpoint | |
zstr_send (self->pipe, "%s", monitor_endpoint); | |
free (monitor_endpoint); | |
char *status = zstr_recv (self->pipe); | |
if (strneq (status, "OK")) | |
zsockmon_destroy (&self); | |
free (status); | |
return self; | |
} | |
// -------------------------------------------------------------------------- | |
// Destructor | |
void | |
zsockmon_destroy (zsockmon_t **self_p) | |
{ | |
assert (self_p); | |
if (*self_p) { | |
zsockmon_t *self = *self_p; | |
zstr_send (self->pipe, "TERMINATE"); | |
free (zstr_recv (self->pipe)); | |
free (self); | |
*self_p = NULL; | |
} | |
} | |
// -------------------------------------------------------------------------- | |
// Get the ZeroMQ socket, for polling or receiving socket | |
// event messages from the backend agent. | |
void * | |
zsockmon_socket (zsockmon_t *self) | |
{ | |
assert (self); | |
return self->pipe; | |
} | |
// -------------------------------------------------------------------------- | |
// Enable verbose tracing of commands and activity | |
void | |
zsockmon_set_verbose (zsockmon_t *self, bool verbose) | |
{ | |
assert (self); | |
zstr_sendm (self->pipe, "VERBOSE"); | |
zstr_send (self->pipe, "%d", verbose); | |
} | |
// -------------------------------------------------------------------------- | |
// Self test of this class | |
static bool | |
s_check_event(void *s, int expected_event, bool verbose) | |
{ | |
zpoller_t *poller = zpoller_new (s, NULL); | |
void *result = zpoller_wait (poller, 500); | |
assert (result); | |
assert (result == s); | |
zmsg_t *msg = zmsg_recv (s); | |
char *evstr = zmsg_popstr (msg); | |
int actual_event = atoi (evstr); | |
free (evstr); | |
zmsg_destroy (&msg); | |
zpoller_destroy (&poller); | |
if (verbose) | |
printf("expected event (%d) == (%d) actual event: %s\n", | |
expected_event, actual_event, | |
actual_event == expected_event ? "PASS" : "FAIL"); | |
return actual_event == expected_event; | |
} | |
void | |
zsockmon_test (bool verbose) | |
{ | |
printf (" * zsockmon: "); | |
if (verbose) | |
printf("\n"); | |
bool result; | |
zctx_t *ctx = zctx_new (); | |
void *sink = zsocket_new (ctx, ZMQ_PULL); | |
zsockmon_t *sinkmon = zsockmon_new (ctx, | |
sink, ZMQ_EVENT_LISTENING | ZMQ_EVENT_ACCEPTED); | |
zsockmon_set_verbose (sinkmon, verbose); | |
void *sinkmon_sock = zsockmon_socket (sinkmon); | |
// check sink is now listening | |
zsocket_bind (sink, "tcp://*:5555"); | |
result = s_check_event (sinkmon_sock, ZMQ_EVENT_LISTENING, verbose); | |
if (verbose) | |
printf("bind - %s\n", result ? "PASS" : "FAIL"); | |
assert (result); | |
void *source = zsocket_new (ctx, ZMQ_PUSH); | |
zsockmon_t *sourcemon = zsockmon_new (ctx, | |
source, ZMQ_EVENT_CONNECTED | ZMQ_EVENT_DISCONNECTED); | |
zsockmon_set_verbose (sourcemon, verbose); | |
void *sourcemon_sock = zsockmon_socket (sourcemon); | |
zsocket_connect (source, "tcp://localhost:5555"); | |
// check source connected to sink | |
result = s_check_event (sourcemon_sock, ZMQ_EVENT_CONNECTED, verbose); | |
if (verbose) | |
printf("connect - %s\n", result ? "PASS" : "FAIL"); | |
assert (result); | |
// confirm sink accepted connection | |
result = s_check_event (sinkmon_sock, ZMQ_EVENT_ACCEPTED, verbose); | |
if (verbose) | |
printf("accepted - %s\n", result ? "PASS" : "FAIL"); | |
assert (result); | |
// destroy sink to trigger a disconnect event on the source | |
zsocket_destroy (ctx, sink); | |
result = s_check_event (sourcemon_sock, ZMQ_EVENT_DISCONNECTED, verbose); | |
if (verbose) | |
printf("disconnect - %s\n", result ? "PASS" : "FAIL"); | |
assert (result); | |
zsockmon_destroy (&sinkmon); | |
zsockmon_destroy (&sourcemon); | |
zctx_destroy (&ctx); | |
} | |
// -------------------------------------------------------------------------- | |
// Backend agent implementation | |
// Agent instance | |
typedef struct { | |
zctx_t *ctx; | |
void *pipe; // Socket back to application | |
void *mon; // monitor socket | |
char *endpoint; // monitor endpoint | |
bool verbose; // Trace activity to stdout | |
bool terminated; | |
} agent_t; | |
// Prototypes for local functions we use in the agent | |
static agent_t * | |
s_agent_new (zctx_t *ctx, void *pipe, char *endpoint); | |
static void | |
s_api_command (agent_t *self); | |
static void | |
s_event_recv (agent_t *self); | |
static void | |
s_agent_destroy (agent_t **self_p); | |
// This is the background task that monitors socket events | |
static void | |
s_agent_task (void *args, zctx_t *ctx, void *pipe) | |
{ | |
// read endpoint sent over pipe | |
char *endpoint = zstr_recv (pipe); | |
assert (endpoint); | |
// Create agent instance | |
agent_t *self = s_agent_new (ctx, pipe, endpoint); | |
zpoller_t *poller = zpoller_new (self->pipe, self->mon, NULL); | |
while (!zctx_interrupted) { | |
// Poll on API pipe and on monitor socket | |
void *result = zpoller_wait (poller, -1); | |
if (result == NULL) | |
break; // Interrupted | |
if (result == self->pipe) | |
s_api_command (self); | |
if (result == self->mon) | |
s_event_recv (self); | |
if (self->terminated) | |
break; | |
} | |
zpoller_destroy (&poller); | |
s_agent_destroy (&self); | |
} | |
// -------------------------------------------------------------------------- | |
// Create and initialize new agent instance | |
static agent_t * | |
s_agent_new (zctx_t *ctx, void *pipe, char *endpoint) | |
{ | |
agent_t *self = (agent_t *) malloc (sizeof (agent_t)); | |
assert (self); | |
self->ctx = ctx; | |
self->pipe = pipe; | |
self->endpoint = endpoint; | |
self->verbose = false; | |
self->terminated = false; | |
// connect to the socket monitor inproc endpoint | |
self->mon = zsocket_new (self->ctx, ZMQ_PAIR); | |
assert (self->mon); | |
if (zsocket_connect (self->mon, self->endpoint) == 0) | |
zstr_send (self->pipe, "OK"); | |
else | |
zstr_send (self->pipe, "ERROR"); | |
return self; | |
} | |
// -------------------------------------------------------------------------- | |
// Handle command from API | |
static void | |
s_api_command (agent_t *self) | |
{ | |
char *command = zstr_recv (self->pipe); | |
if (self->verbose) | |
printf ("I: received api command: %s\n", command); | |
if (streq (command, "TERMINATE")) { | |
self->terminated = true; | |
zstr_send (self->pipe, "OK"); | |
} | |
else | |
if (streq (command, "VERBOSE")) { | |
char *verbose = zstr_recv (self->pipe); | |
self->verbose = *verbose == '1'; | |
free (verbose); | |
} | |
else | |
printf ("E: unexpected API command '%s'\n", command); | |
free (command); | |
} | |
// -------------------------------------------------------------------------- | |
// Handle event from socket monitor | |
static void | |
s_event_recv (agent_t *self) | |
{ | |
zframe_t *frame; | |
zmq_event_t event; | |
char *description; | |
char addr[1025]; | |
// extract event data into event struct | |
frame = zframe_recv (self->mon); | |
// extract id of the event as bitfield | |
memcpy (&(event.event), | |
zframe_data (frame), | |
sizeof (event.event)); | |
// extract value which is either error code, fd or reconnect interval | |
memcpy (&(event.value), | |
zframe_data (frame) + sizeof (event.event), | |
sizeof (event.value)); | |
zframe_destroy (&frame); | |
// copy address part | |
frame = zframe_recv (self->mon); | |
memcpy (addr, zframe_data (frame), zframe_size (frame)); | |
*(addr + zframe_size (frame)) = '\0'; // add null terminator to address string | |
zframe_destroy (&frame); | |
switch (event.event) { | |
case ZMQ_EVENT_ACCEPTED: | |
description = "Accepted"; | |
break; | |
case ZMQ_EVENT_ACCEPT_FAILED: | |
description = "Accept failed"; | |
break; | |
case ZMQ_EVENT_BIND_FAILED: | |
description = "Bind failed"; | |
break; | |
case ZMQ_EVENT_CLOSED: | |
description = "Closed"; | |
break; | |
case ZMQ_EVENT_CLOSE_FAILED: | |
description = "Close failed"; | |
break; | |
case ZMQ_EVENT_DISCONNECTED: | |
description = "Disconnected"; | |
break; | |
case ZMQ_EVENT_CONNECTED: | |
description = "Connected"; | |
break; | |
case ZMQ_EVENT_CONNECT_DELAYED: | |
description = "Connect delayed"; | |
break; | |
case ZMQ_EVENT_CONNECT_RETRIED: | |
description = "Connect retried"; | |
break; | |
case ZMQ_EVENT_LISTENING: | |
description = "Listening"; | |
break; | |
case ZMQ_EVENT_MONITOR_STOPPED: | |
description = "Monitor stopped"; | |
break; | |
default: | |
if (self->verbose) | |
printf ("Unknown socket monitor event: %d", event.value); | |
break; | |
} | |
if (self->verbose) | |
printf ("EV: %s - %s\n", description, addr); | |
zmsg_t *msg = zmsg_new(); | |
zmsg_addstr (msg, "%d", (int) event.event ); | |
zmsg_addstr (msg, "%d", (int) event.value ); | |
zmsg_addstr (msg, "%s", addr); | |
zmsg_addstr (msg, "%s", description); | |
zmsg_send (&msg, self->pipe); | |
} | |
// -------------------------------------------------------------------------- | |
// Destroy agent instance | |
static void | |
s_agent_destroy (agent_t **self_p) | |
{ | |
assert (self_p); | |
if (*self_p) { | |
agent_t *self = *self_p; | |
free (self->endpoint); | |
free (self); | |
*self_p = NULL; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef __ZSOCKMON_H_INCLUDED__ | |
#define __ZSOCKMON_H_INCLUDED__ | |
#ifdef __cplusplus | |
extern "C" { | |
#endif | |
// Opaque class structure | |
typedef struct _zsockmon_t zsockmon_t; | |
// @interface | |
// Create a new socket monitor | |
CZMQ_EXPORT zsockmon_t * | |
zsockmon_new (zctx_t *ctx, void *socket, int events); | |
// Destroy a beacon | |
CZMQ_EXPORT void | |
zsockmon_destroy (zsockmon_t **self_p); | |
// Get zsockmon pipe socket, for polling or receiving messages | |
CZMQ_EXPORT void * | |
zsockmon_socket (zsockmon_t *self); | |
// | |
CZMQ_EXPORT void | |
zsockmon_set_verbose (zsockmon_t *self, bool verbose); | |
// Self test of this class | |
CZMQ_EXPORT void | |
zsockmon_test (bool verbose); | |
// @end | |
#ifdef __cplusplus | |
} | |
#endif | |
#endif |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment