Skip to content

Instantly share code, notes, and snippets.

@lighth7015
Created February 13, 2019 17:14
Show Gist options
  • Save lighth7015/8656b54eeb503f0f062b942ab3f225bd to your computer and use it in GitHub Desktop.
Save lighth7015/8656b54eeb503f0f062b942ab3f225bd to your computer and use it in GitHub Desktop.
System V Message Queue Listener
/*
* Copyright (c) 2009-2012 Niels Provos, Nick Mathewson
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <sys/types.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include "event2/event-config.h"
#include "event2/listener.h"
#include "event2/util.h"
#include "event2/event.h"
#include "event2/thread.h"
#define INTERNAL
#include "queue.h"
int ev_message_queue_enable(ev_message_queue_t instance);
int event_message_queue_enable(ev_message_queue_t instance);
int ev_message_queue_disable(ev_message_queue_t instance);
int event_message_queue_disable(ev_message_queue_t instance);
int ev_message_queue_decref_and_unlock(struct ev_message_queue *listener);
static
struct ev_message_queue_ops ev_message_queue_event_ops = {
event_message_queue_enable,
event_message_queue_disable,
event_message_queue_destroy,
NULL, /* shutdown */
event_message_queue_get_fd,
event_message_queue_get_base
};
int
ev_message_queue_enable(ev_message_queue_t instance)
{
LOCK(instance);
instance->enabled = 1;
if (instance->cb) {
ev_message_queue_ops_t ref = (ev_message_queue_ops_t)
EVUTIL_UPCAST(instance, struct ev_message_queue_event, base)
->base
.ops;
return ref->enable(instance);
}
UNLOCK(instance);
return 0;
}
int
ev_message_queue_disable(ev_message_queue_t instance)
{
LOCK(instance);
instance->enabled = 0;
if (instance->cb) {
ev_message_queue_ops_t ref = (ev_message_queue_ops_t)
EVUTIL_UPCAST(instance, struct ev_message_queue_event, base)
->base
.ops;
return ref->disable(instance);
}
UNLOCK(instance);
return 0;
}
int
event_message_queue_enable(ev_message_queue_t instance)
{
struct ev_message_queue_event *event =
EVUTIL_UPCAST(instance, struct ev_message_queue_event, base);
return event_add(&event->listener, NULL);
}
int
event_message_queue_disable(ev_message_queue_t instance)
{
struct ev_message_queue_event *event =
EVUTIL_UPCAST(instance, struct ev_message_queue_event, base);
return event_del(&event->listener);
}
ev_message_queue_t
ev_message_queue_new(struct event_base *base, ev_message_queue_cb_t cb, unsigned qflags, unsigned flags, key_t key)
{
struct ev_message_queue_event *instance = (struct ev_message_queue_event *)
mm_alloc(1, sizeof(struct ev_message_queue_event));
if (instance)
{
int fd = msgget(key, 0644 | qflags);
instance->base.ops = &ev_message_queue_event_ops;
instance->base.cb = cb;
const ev_message_queue_server_t ipc_server =
instance->base.server = mm_alloc(1, ev_message_queue_server_t);
instance->base.flags = flags;
instance->base.refcnt = 1;
if (flags & LEV_OPT_THREADSAFE) {
EVTHREAD_ALLOC_LOCK(instance->base.lock, EVTHREAD_LOCKTYPE_RECURSIVE);
}
event_assign(&instance->listener, base, fd, EV_READ|EV_PERSIST,
(event_message_queue_read_cb_t) event_message_queue_read_cb, instance);
ev_message_queue_enable(&instance->base);
return &instance->base;
}
return NULL;
}
void ev_message_queue_free(struct ev_message_queue *instance)
{
LOCK(instance);
instance->cb = NULL;
instance->errorcb = NULL;
if (instance->ops->shutdown)
instance->ops->shutdown(instance);
ev_message_queue_decref_and_unlock(instance);
}
void event_message_queue_destroy(struct ev_message_queue *instance)
{
struct ev_message_queue_event *event =
EVUTIL_UPCAST(instance, struct ev_message_queue_event, base);
event_del(&event->listener);
if (instance->flags & LEV_OPT_CLOSE_ON_FREE)
msgctl(event_get_fd(&event->listener), IPC_RMID, NULL);
event_debug_unassign(&event->listener);
}
key_t ev_message_queue_get_fd(struct ev_message_queue *instance)
{
key_t fd;
LOCK(instance);
fd = instance->ops->getfd(instance);
UNLOCK(instance);
return fd;
}
key_t event_message_queue_get_fd(struct ev_message_queue *instance)
{
struct ev_message_queue_event *event =
EVUTIL_UPCAST(instance, struct ev_message_queue_event, base);
return event_get_fd(&event->listener);
}
struct event_base *
ev_message_queue_get_base(struct ev_message_queue *instance)
{
LOCK(instance);
struct event_base *base = instance->ops->getbase(instance);
UNLOCK(instance);
return base;
}
struct event_base *
event_message_queue_get_base(struct ev_message_queue *instance)
{
struct ev_message_queue_event *event =
EVUTIL_UPCAST(instance, struct ev_message_queue_event, base);
return event_get_base(&event->listener);
}
void
ev_message_queue_set_cb(struct ev_message_queue *instance,
ev_message_queue_cb_t cb)
{
int enable = 0;
LOCK(instance);
if (instance->enabled && !instance->cb)
enable = 1;
instance->cb = cb;
UNLOCK(instance);
}
void
ev_message_queue_set_error_cb(struct ev_message_queue *instance,
ev_message_queue_error_cb_t callback)
{
LOCK(instance);
instance->errorcb = callback;
UNLOCK(instance);
}
void
event_message_queue_read_cb(evutil_socket_t fd, short what, ev_message_queue_t instance)
{
int err;
ev_message_queue_cb_t cb;
ev_message_queue_error_cb_t errorcb;
struct msqid_ds buffer;
const ev_message_queue_message_t message =
(ev_message_queue_message_t) mm_alloc(1, sizeof(struct ev_message_queue_message));;
LOCK(instance);
while (1) {
if (instance->cb == NULL) {
UNLOCK(instance);
return;
}
int result = msgctl(fd, IPC_STAT, &buffer);
err = evutil_socket_geterror(fd);
printf("result = %d\n", result);
if (result == 0
&& (buffer.msg_qbytes) > 0)
{
const char* data = mm_alloc(buffer.msg_qbytes, sizeof(char));
ev_message_buffer_t bm_address = (ev_message_buffer_t)
mm_alloc(1, sizeof(struct ev_message_buffer));
message->length = buffer.msg_qbytes;
message->buffer = (void *) bm_address;
message->event_id = what;
message->server = &instance->server;
msgrcv(fd, bm_address, message->length, 0, IPC_NOWAIT);
++instance->refcnt;
cb = instance->cb;
UNLOCK(instance);
cb( instance, message );
LOCK(instance);
if (instance->refcnt == 1) {
int freed = ev_message_queue_decref_and_unlock(instance);
EVUTIL_ASSERT(freed);
return;
}
--instance->refcnt;
if (!instance->enabled) {
// the callback could have disabled the listener
UNLOCK(instance);
return;
}
}
else {
if (result < 0) {
if (EVUTIL_ERR_ACCEPT_RETRIABLE(err))
{
UNLOCK(instance);
event_sock_warn(fd, "Error reading buffer RPC server buffer.");
return;
}
}
}
}
if (EVUTIL_ERR_ACCEPT_RETRIABLE(err))
{
UNLOCK(instance);
return;
}
if (instance->errorcb != NULL)
{
++instance->refcnt;
errorcb = instance->errorcb;
UNLOCK(instance);
errorcb( instance, message );
LOCK(instance);
ev_message_queue_decref_and_unlock(instance);
} else {
event_sock_warn(fd, "Error from accept() call");
UNLOCK(instance);
}
}
int ev_message_queue_decref_and_unlock(struct ev_message_queue *listener)
{
int refcnt = --listener->refcnt;
if (refcnt == 0) {
listener->ops->destroy(listener);
UNLOCK(listener);
EVTHREAD_FREE_LOCK(listener->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
mm_free(listener);
return 1;
} else {
UNLOCK(listener);
return 0;
}
}
#pragma once
#include <sys/msg.h>
struct ev_message_queue_server;
typedef struct ev_message_queue_server *ev_message_queue_server_t;
struct ev_message_queue;
typedef struct ev_message_queue *ev_message_queue_t;
typedef void (*ev_message_queue_cb_t)(ev_message_queue_t, void *);
typedef void (*ev_message_queue_error_cb_t)(ev_message_queue_t, void *);
typedef struct ev_message_queue_message {
short event_id;
ssize_t length;
const void* buffer;
ev_message_queue_server_t* server;
} *ev_message_queue_message_t;
ev_message_queue_t ev_message_queue_new(struct event_base *base,
ev_message_queue_cb_t cb, unsigned qflags, unsigned flags, key_t key);
void ev_message_queue_free(ev_message_queue_t instance);
#include "queue-private.h"
#include "event2/event_struct.h"
typedef
struct ev_message_buffer {
long type;
char* data;
} *ev_message_buffer_t;
#ifdef INTERNAL
extern struct evthread_lock_callbacks evthread_lock_fns_;
extern void event_errx(int eval, const char *fmt, ...);
extern void event_sock_warn(evutil_socket_t sock, const char *fmt, ...);
extern void *event_mm_calloc(size_t count, size_t size);
#define EVENT_ERR_ABORT_ ((int)0xdeaddead)
#if defined(__GNUC__) && __GNUC__ >= 3 /* gcc 3.0 or later */
#define EVUTIL_UNLIKELY(p) __builtin_expect(!!(p),0)
#else
#define EVUTIL_UNLIKELY(p) (p)
#endif
#if EAGAIN == EWOULDBLOCK
#define EVUTIL_ERR_IS_EAGAIN(e) \
((e) == EAGAIN)
#else
#define EVUTIL_ERR_IS_EAGAIN(e) \
((e) == EAGAIN || (e) == EWOULDBLOCK)
#endif
#define EVUTIL_ERR_ACCEPT_RETRIABLE(e) \
((e) == EINTR || EVUTIL_ERR_IS_EAGAIN(e) || (e) == ECONNABORTED)
#define EVUTIL_ASSERT(cond) \
do { \
if (EVUTIL_UNLIKELY(!(cond))) { \
event_errx(EVENT_ERR_ABORT_, \
"%s:%d: Assertion %s failed in %s", \
__FILE__,__LINE__,#cond,__func__); \
/* In case a user-supplied handler tries to */ \
/* return control to us, log and abort here. */ \
(void)fprintf(stderr, \
"%s:%d: Assertion %s failed in %s", \
__FILE__,__LINE__,#cond,__func__); \
abort(); \
} \
} while (0)
#define EVUTIL_UPCAST(ptr, type, field) \
((type *)(((char*)(ptr)) - evutil_offsetof(type, field)))
/** Acquire a lock. */
#define EVLOCK_LOCK(lockvar,mode) \
do { \
if (lockvar) \
evthread_lock_fns_.lock(mode, lockvar); \
} while (0)
/** Release a lock */
#define EVLOCK_UNLOCK(lockvar,mode) \
do { \
if (lockvar) \
evthread_lock_fns_.unlock(mode, lockvar); \
} while (0)
/** Free a given lock, if it is present and locking is enabled. */
#define EVTHREAD_FREE_LOCK(lockvar, locktype) \
do { \
void *lock_tmp_ = (lockvar); \
if (lock_tmp_ && evthread_lock_fns_.free) \
evthread_lock_fns_.free(lock_tmp_, (locktype)); \
} while (0)
/** Allocate a new lock, and store it in lockvar, a void*. Sets lockvar to
NULL if locking is not enabled. */
#define EVTHREAD_ALLOC_LOCK(lockvar, locktype) \
((lockvar) = evthread_lock_fns_.alloc ? \
evthread_lock_fns_.alloc(locktype) : NULL)
/** Free a given lock, if it is present and locking is enabled. */
#define EVTHREAD_FREE_LOCK(lockvar, locktype) \
do { \
void *lock_tmp_ = (lockvar); \
if (lock_tmp_ && evthread_lock_fns_.free) \
evthread_lock_fns_.free(lock_tmp_, (locktype)); \
} while (0)
#define LOCK(listener) EVLOCK_LOCK((listener)->lock, 0)
#define UNLOCK(listener) EVLOCK_UNLOCK((listener)->lock, 0)
#define mm_free(p) free(p)
#define mm_alloc(count, size) calloc(count, sizeof(size))
typedef struct ev_message_queue_server {
unsigned int port;
key_t key;
short events;
} *ev_message_queue_server_t;
struct ev_message_queue;
typedef
struct ev_message_queue_ops {
int (*enable)(struct ev_message_queue *);
int (*disable)(struct ev_message_queue *);
void (*destroy)(struct ev_message_queue *);
void (*shutdown)(struct ev_message_queue *);
key_t (*getfd)(struct ev_message_queue *);
struct event_base *(*getbase)(struct ev_message_queue *);
} *ev_message_queue_ops_t,
ev_message_queue_ops;
typedef struct ev_message_queue {
ev_message_queue_ops_t ops;
ev_message_queue_server_t server;
void *lock;
ev_message_queue_cb_t cb;
ev_message_queue_error_cb_t errorcb;
unsigned flags;
unsigned enabled : 1;
short refcnt;
} *ev_message_queue_t;
struct ev_message_queue_event {
struct ev_message_queue base;
struct event listener;
};
void event_message_queue_destroy(struct ev_message_queue *);
void event_message_queue_read_cb(evutil_socket_t, short, ev_message_queue_t );
key_t event_message_queue_get_fd(struct ev_message_queue *);
key_t ev_message_queue_get_fd(struct ev_message_queue *instance);
struct event_base *event_message_queue_get_base(struct ev_message_queue *);
typedef
void (*event_message_queue_read_cb_t)(evutil_socket_t, short, void *);
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment