Created
February 13, 2019 17:14
-
-
Save lighth7015/8656b54eeb503f0f062b942ab3f225bd to your computer and use it in GitHub Desktop.
System V Message Queue Listener
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright (c) 2009-2012 Niels Provos, Nick Mathewson | |
* | |
* Redistribution and use in source and binary forms, with or without | |
* modification, are permitted provided that the following conditions | |
* are met: | |
* 1. Redistributions of source code must retain the above copyright | |
* notice, this list of conditions and the following disclaimer. | |
* 2. Redistributions in binary form must reproduce the above copyright | |
* notice, this list of conditions and the following disclaimer in the | |
* documentation and/or other materials provided with the distribution. | |
* 3. The name of the author may not be used to endorse or promote products | |
* derived from this software without specific prior written permission. | |
* | |
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR | |
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES | |
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. | |
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, | |
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT | |
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, | |
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY | |
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | |
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF | |
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |
*/ | |
#include <sys/types.h> | |
#include <errno.h> | |
#include <fcntl.h> | |
#include <unistd.h> | |
#include <stdlib.h> | |
#include "event2/event-config.h" | |
#include "event2/listener.h" | |
#include "event2/util.h" | |
#include "event2/event.h" | |
#include "event2/thread.h" | |
#define INTERNAL | |
#include "queue.h" | |
int ev_message_queue_enable(ev_message_queue_t instance); | |
int event_message_queue_enable(ev_message_queue_t instance); | |
int ev_message_queue_disable(ev_message_queue_t instance); | |
int event_message_queue_disable(ev_message_queue_t instance); | |
int ev_message_queue_decref_and_unlock(struct ev_message_queue *listener); | |
static | |
struct ev_message_queue_ops ev_message_queue_event_ops = { | |
event_message_queue_enable, | |
event_message_queue_disable, | |
event_message_queue_destroy, | |
NULL, /* shutdown */ | |
event_message_queue_get_fd, | |
event_message_queue_get_base | |
}; | |
int | |
ev_message_queue_enable(ev_message_queue_t instance) | |
{ | |
LOCK(instance); | |
instance->enabled = 1; | |
if (instance->cb) { | |
ev_message_queue_ops_t ref = (ev_message_queue_ops_t) | |
EVUTIL_UPCAST(instance, struct ev_message_queue_event, base) | |
->base | |
.ops; | |
return ref->enable(instance); | |
} | |
UNLOCK(instance); | |
return 0; | |
} | |
int | |
ev_message_queue_disable(ev_message_queue_t instance) | |
{ | |
LOCK(instance); | |
instance->enabled = 0; | |
if (instance->cb) { | |
ev_message_queue_ops_t ref = (ev_message_queue_ops_t) | |
EVUTIL_UPCAST(instance, struct ev_message_queue_event, base) | |
->base | |
.ops; | |
return ref->disable(instance); | |
} | |
UNLOCK(instance); | |
return 0; | |
} | |
int | |
event_message_queue_enable(ev_message_queue_t instance) | |
{ | |
struct ev_message_queue_event *event = | |
EVUTIL_UPCAST(instance, struct ev_message_queue_event, base); | |
return event_add(&event->listener, NULL); | |
} | |
int | |
event_message_queue_disable(ev_message_queue_t instance) | |
{ | |
struct ev_message_queue_event *event = | |
EVUTIL_UPCAST(instance, struct ev_message_queue_event, base); | |
return event_del(&event->listener); | |
} | |
ev_message_queue_t | |
ev_message_queue_new(struct event_base *base, ev_message_queue_cb_t cb, unsigned qflags, unsigned flags, key_t key) | |
{ | |
struct ev_message_queue_event *instance = (struct ev_message_queue_event *) | |
mm_alloc(1, sizeof(struct ev_message_queue_event)); | |
if (instance) | |
{ | |
int fd = msgget(key, 0644 | qflags); | |
instance->base.ops = &ev_message_queue_event_ops; | |
instance->base.cb = cb; | |
const ev_message_queue_server_t ipc_server = | |
instance->base.server = mm_alloc(1, ev_message_queue_server_t); | |
instance->base.flags = flags; | |
instance->base.refcnt = 1; | |
if (flags & LEV_OPT_THREADSAFE) { | |
EVTHREAD_ALLOC_LOCK(instance->base.lock, EVTHREAD_LOCKTYPE_RECURSIVE); | |
} | |
event_assign(&instance->listener, base, fd, EV_READ|EV_PERSIST, | |
(event_message_queue_read_cb_t) event_message_queue_read_cb, instance); | |
ev_message_queue_enable(&instance->base); | |
return &instance->base; | |
} | |
return NULL; | |
} | |
void ev_message_queue_free(struct ev_message_queue *instance) | |
{ | |
LOCK(instance); | |
instance->cb = NULL; | |
instance->errorcb = NULL; | |
if (instance->ops->shutdown) | |
instance->ops->shutdown(instance); | |
ev_message_queue_decref_and_unlock(instance); | |
} | |
void event_message_queue_destroy(struct ev_message_queue *instance) | |
{ | |
struct ev_message_queue_event *event = | |
EVUTIL_UPCAST(instance, struct ev_message_queue_event, base); | |
event_del(&event->listener); | |
if (instance->flags & LEV_OPT_CLOSE_ON_FREE) | |
msgctl(event_get_fd(&event->listener), IPC_RMID, NULL); | |
event_debug_unassign(&event->listener); | |
} | |
key_t ev_message_queue_get_fd(struct ev_message_queue *instance) | |
{ | |
key_t fd; | |
LOCK(instance); | |
fd = instance->ops->getfd(instance); | |
UNLOCK(instance); | |
return fd; | |
} | |
key_t event_message_queue_get_fd(struct ev_message_queue *instance) | |
{ | |
struct ev_message_queue_event *event = | |
EVUTIL_UPCAST(instance, struct ev_message_queue_event, base); | |
return event_get_fd(&event->listener); | |
} | |
struct event_base * | |
ev_message_queue_get_base(struct ev_message_queue *instance) | |
{ | |
LOCK(instance); | |
struct event_base *base = instance->ops->getbase(instance); | |
UNLOCK(instance); | |
return base; | |
} | |
struct event_base * | |
event_message_queue_get_base(struct ev_message_queue *instance) | |
{ | |
struct ev_message_queue_event *event = | |
EVUTIL_UPCAST(instance, struct ev_message_queue_event, base); | |
return event_get_base(&event->listener); | |
} | |
void | |
ev_message_queue_set_cb(struct ev_message_queue *instance, | |
ev_message_queue_cb_t cb) | |
{ | |
int enable = 0; | |
LOCK(instance); | |
if (instance->enabled && !instance->cb) | |
enable = 1; | |
instance->cb = cb; | |
UNLOCK(instance); | |
} | |
void | |
ev_message_queue_set_error_cb(struct ev_message_queue *instance, | |
ev_message_queue_error_cb_t callback) | |
{ | |
LOCK(instance); | |
instance->errorcb = callback; | |
UNLOCK(instance); | |
} | |
void | |
event_message_queue_read_cb(evutil_socket_t fd, short what, ev_message_queue_t instance) | |
{ | |
int err; | |
ev_message_queue_cb_t cb; | |
ev_message_queue_error_cb_t errorcb; | |
struct msqid_ds buffer; | |
const ev_message_queue_message_t message = | |
(ev_message_queue_message_t) mm_alloc(1, sizeof(struct ev_message_queue_message));; | |
LOCK(instance); | |
while (1) { | |
if (instance->cb == NULL) { | |
UNLOCK(instance); | |
return; | |
} | |
int result = msgctl(fd, IPC_STAT, &buffer); | |
err = evutil_socket_geterror(fd); | |
printf("result = %d\n", result); | |
if (result == 0 | |
&& (buffer.msg_qbytes) > 0) | |
{ | |
const char* data = mm_alloc(buffer.msg_qbytes, sizeof(char)); | |
ev_message_buffer_t bm_address = (ev_message_buffer_t) | |
mm_alloc(1, sizeof(struct ev_message_buffer)); | |
message->length = buffer.msg_qbytes; | |
message->buffer = (void *) bm_address; | |
message->event_id = what; | |
message->server = &instance->server; | |
msgrcv(fd, bm_address, message->length, 0, IPC_NOWAIT); | |
++instance->refcnt; | |
cb = instance->cb; | |
UNLOCK(instance); | |
cb( instance, message ); | |
LOCK(instance); | |
if (instance->refcnt == 1) { | |
int freed = ev_message_queue_decref_and_unlock(instance); | |
EVUTIL_ASSERT(freed); | |
return; | |
} | |
--instance->refcnt; | |
if (!instance->enabled) { | |
// the callback could have disabled the listener | |
UNLOCK(instance); | |
return; | |
} | |
} | |
else { | |
if (result < 0) { | |
if (EVUTIL_ERR_ACCEPT_RETRIABLE(err)) | |
{ | |
UNLOCK(instance); | |
event_sock_warn(fd, "Error reading buffer RPC server buffer."); | |
return; | |
} | |
} | |
} | |
} | |
if (EVUTIL_ERR_ACCEPT_RETRIABLE(err)) | |
{ | |
UNLOCK(instance); | |
return; | |
} | |
if (instance->errorcb != NULL) | |
{ | |
++instance->refcnt; | |
errorcb = instance->errorcb; | |
UNLOCK(instance); | |
errorcb( instance, message ); | |
LOCK(instance); | |
ev_message_queue_decref_and_unlock(instance); | |
} else { | |
event_sock_warn(fd, "Error from accept() call"); | |
UNLOCK(instance); | |
} | |
} | |
int ev_message_queue_decref_and_unlock(struct ev_message_queue *listener) | |
{ | |
int refcnt = --listener->refcnt; | |
if (refcnt == 0) { | |
listener->ops->destroy(listener); | |
UNLOCK(listener); | |
EVTHREAD_FREE_LOCK(listener->lock, EVTHREAD_LOCKTYPE_RECURSIVE); | |
mm_free(listener); | |
return 1; | |
} else { | |
UNLOCK(listener); | |
return 0; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include <sys/msg.h> | |
struct ev_message_queue_server; | |
typedef struct ev_message_queue_server *ev_message_queue_server_t; | |
struct ev_message_queue; | |
typedef struct ev_message_queue *ev_message_queue_t; | |
typedef void (*ev_message_queue_cb_t)(ev_message_queue_t, void *); | |
typedef void (*ev_message_queue_error_cb_t)(ev_message_queue_t, void *); | |
typedef struct ev_message_queue_message { | |
short event_id; | |
ssize_t length; | |
const void* buffer; | |
ev_message_queue_server_t* server; | |
} *ev_message_queue_message_t; | |
ev_message_queue_t ev_message_queue_new(struct event_base *base, | |
ev_message_queue_cb_t cb, unsigned qflags, unsigned flags, key_t key); | |
void ev_message_queue_free(ev_message_queue_t instance); | |
#include "queue-private.h" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "event2/event_struct.h" | |
typedef | |
struct ev_message_buffer { | |
long type; | |
char* data; | |
} *ev_message_buffer_t; | |
#ifdef INTERNAL | |
extern struct evthread_lock_callbacks evthread_lock_fns_; | |
extern void event_errx(int eval, const char *fmt, ...); | |
extern void event_sock_warn(evutil_socket_t sock, const char *fmt, ...); | |
extern void *event_mm_calloc(size_t count, size_t size); | |
#define EVENT_ERR_ABORT_ ((int)0xdeaddead) | |
#if defined(__GNUC__) && __GNUC__ >= 3 /* gcc 3.0 or later */ | |
#define EVUTIL_UNLIKELY(p) __builtin_expect(!!(p),0) | |
#else | |
#define EVUTIL_UNLIKELY(p) (p) | |
#endif | |
#if EAGAIN == EWOULDBLOCK | |
#define EVUTIL_ERR_IS_EAGAIN(e) \ | |
((e) == EAGAIN) | |
#else | |
#define EVUTIL_ERR_IS_EAGAIN(e) \ | |
((e) == EAGAIN || (e) == EWOULDBLOCK) | |
#endif | |
#define EVUTIL_ERR_ACCEPT_RETRIABLE(e) \ | |
((e) == EINTR || EVUTIL_ERR_IS_EAGAIN(e) || (e) == ECONNABORTED) | |
#define EVUTIL_ASSERT(cond) \ | |
do { \ | |
if (EVUTIL_UNLIKELY(!(cond))) { \ | |
event_errx(EVENT_ERR_ABORT_, \ | |
"%s:%d: Assertion %s failed in %s", \ | |
__FILE__,__LINE__,#cond,__func__); \ | |
/* In case a user-supplied handler tries to */ \ | |
/* return control to us, log and abort here. */ \ | |
(void)fprintf(stderr, \ | |
"%s:%d: Assertion %s failed in %s", \ | |
__FILE__,__LINE__,#cond,__func__); \ | |
abort(); \ | |
} \ | |
} while (0) | |
#define EVUTIL_UPCAST(ptr, type, field) \ | |
((type *)(((char*)(ptr)) - evutil_offsetof(type, field))) | |
/** Acquire a lock. */ | |
#define EVLOCK_LOCK(lockvar,mode) \ | |
do { \ | |
if (lockvar) \ | |
evthread_lock_fns_.lock(mode, lockvar); \ | |
} while (0) | |
/** Release a lock */ | |
#define EVLOCK_UNLOCK(lockvar,mode) \ | |
do { \ | |
if (lockvar) \ | |
evthread_lock_fns_.unlock(mode, lockvar); \ | |
} while (0) | |
/** Free a given lock, if it is present and locking is enabled. */ | |
#define EVTHREAD_FREE_LOCK(lockvar, locktype) \ | |
do { \ | |
void *lock_tmp_ = (lockvar); \ | |
if (lock_tmp_ && evthread_lock_fns_.free) \ | |
evthread_lock_fns_.free(lock_tmp_, (locktype)); \ | |
} while (0) | |
/** Allocate a new lock, and store it in lockvar, a void*. Sets lockvar to | |
NULL if locking is not enabled. */ | |
#define EVTHREAD_ALLOC_LOCK(lockvar, locktype) \ | |
((lockvar) = evthread_lock_fns_.alloc ? \ | |
evthread_lock_fns_.alloc(locktype) : NULL) | |
/** Free a given lock, if it is present and locking is enabled. */ | |
#define EVTHREAD_FREE_LOCK(lockvar, locktype) \ | |
do { \ | |
void *lock_tmp_ = (lockvar); \ | |
if (lock_tmp_ && evthread_lock_fns_.free) \ | |
evthread_lock_fns_.free(lock_tmp_, (locktype)); \ | |
} while (0) | |
#define LOCK(listener) EVLOCK_LOCK((listener)->lock, 0) | |
#define UNLOCK(listener) EVLOCK_UNLOCK((listener)->lock, 0) | |
#define mm_free(p) free(p) | |
#define mm_alloc(count, size) calloc(count, sizeof(size)) | |
typedef struct ev_message_queue_server { | |
unsigned int port; | |
key_t key; | |
short events; | |
} *ev_message_queue_server_t; | |
struct ev_message_queue; | |
typedef | |
struct ev_message_queue_ops { | |
int (*enable)(struct ev_message_queue *); | |
int (*disable)(struct ev_message_queue *); | |
void (*destroy)(struct ev_message_queue *); | |
void (*shutdown)(struct ev_message_queue *); | |
key_t (*getfd)(struct ev_message_queue *); | |
struct event_base *(*getbase)(struct ev_message_queue *); | |
} *ev_message_queue_ops_t, | |
ev_message_queue_ops; | |
typedef struct ev_message_queue { | |
ev_message_queue_ops_t ops; | |
ev_message_queue_server_t server; | |
void *lock; | |
ev_message_queue_cb_t cb; | |
ev_message_queue_error_cb_t errorcb; | |
unsigned flags; | |
unsigned enabled : 1; | |
short refcnt; | |
} *ev_message_queue_t; | |
struct ev_message_queue_event { | |
struct ev_message_queue base; | |
struct event listener; | |
}; | |
void event_message_queue_destroy(struct ev_message_queue *); | |
void event_message_queue_read_cb(evutil_socket_t, short, ev_message_queue_t ); | |
key_t event_message_queue_get_fd(struct ev_message_queue *); | |
key_t ev_message_queue_get_fd(struct ev_message_queue *instance); | |
struct event_base *event_message_queue_get_base(struct ev_message_queue *); | |
typedef | |
void (*event_message_queue_read_cb_t)(evutil_socket_t, short, void *); | |
#endif |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment