Last active
August 29, 2015 14:08
-
-
Save Asmod4n/03afadfbf3b45c250334 to your computer and use it in GitHub Desktop.
czmq http_parser wslay ruby websockets
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[submodule "picohttpparser"] | |
path = picohttpparser | |
url = https://github.com/h2o/picohttpparser.git |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
users | |
hallo | |
passwd = "$7$C6..../....m/68Yww6iu2cN.h/dijVZagz/K2avROoj/C6SRKzgH7$SZesp4Y0nCX39RFMrxGEUxuxZ6afiLMC8HKTuqkPHM8" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
source 'https://rubygems.org' | |
gem 'ffi-czmq', github: 'Asmod4n/ruby-ffi-czmq' | |
gem 'http_parser.rb' | |
gem 'ffi-wslay', github: 'Asmod4n/ruby-ffi-wslay' | |
gem 'hitimes' | |
gem 'pry' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
GIT | |
remote: git://github.com/Asmod4n/ruby-ffi-czmq.git | |
revision: f15a3a1a689ea61cd3c4222c615ce90e2873f9bf | |
specs: | |
ffi-czmq (0.1.1.pre) | |
ffi (>= 1.9.6) | |
GIT | |
remote: git://github.com/Asmod4n/ruby-ffi-wslay.git | |
revision: b3fffd24b74a94404b1242604af34c1fb4a0c119 | |
specs: | |
ffi-wslay (0.0.1) | |
ffi (>= 1.9.6) | |
rake | |
GEM | |
remote: https://rubygems.org/ | |
specs: | |
coderay (1.1.0) | |
ffi (1.9.6) | |
hitimes (1.2.2) | |
http_parser.rb (0.6.0) | |
method_source (0.8.2) | |
pry (0.10.1) | |
coderay (~> 1.1.0) | |
method_source (~> 0.8.1) | |
slop (~> 3.4) | |
rake (10.4.1) | |
slop (3.6.0) | |
PLATFORMS | |
ruby | |
DEPENDENCIES | |
ffi-czmq! | |
ffi-wslay! | |
hitimes | |
http_parser.rb | |
pry |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
hallo | |
*/ | |
#include <czmq.h> | |
#include <sodium.h> | |
#include "picohttpparser/picohttpparser.h" | |
#if defined (__UTYPE_LINUX) | |
#include <sys/inotify.h> | |
#define INOTIFY_EVENT_SIZE (sizeof (struct inotify_event) + NAME_MAX + 1) | |
#define INOTIFY_BUFF_SIZE (1000 * INOTIFY_EVENT_SIZE) | |
#endif | |
#define S_SELF_TAG 0x2342cafe | |
#define MAX_HEADERS 100 | |
#define MALFORMED_REQUEST "HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\n" | |
#define METHOD_NOT_ALLOWED "HTTP/1.1 405 Method not allowed\r\nConnection: close\r\n\r\n" | |
#define SERVICE_UNAVAILABLE "HTTP/1.1 503 Service Unavailable\r\nConnection: close\r\n\r\n" | |
#define NOT_AUTHORIZED "HTTP/1.1 401 Not Authorized\r\nWWW-Authenticate: Basic realm=\"%s\"\r\nConnection: close\r\n\r\n" | |
#define OKAY "HTTP/1.1 204 No Content\r\nConnection: close\r\n\r\n" | |
static bool | |
strrcmp(const char* s, size_t l, const char* t) | |
{ | |
return strlen(t) == l && memcmp(s, t, l) == 0; | |
} | |
typedef struct { | |
uint32_t tag; | |
zsock_t *pipe; | |
zconfig_t *users; | |
char *not_authed; | |
zarmour_t *base64; | |
zloop_t *reactor; | |
#if defined (__UTYPE_LINUX) | |
int fd; | |
int wd; | |
#endif | |
bool verbose; | |
zsock_t *webserver; | |
int port; | |
} self_t; | |
static bool | |
s_self_is (void *self) | |
{ | |
assert (self); | |
return ((self_t *) self)->tag == S_SELF_TAG; | |
} | |
static void | |
s_self_destroy (self_t **self_p) | |
{ | |
assert (self_p); | |
if (*self_p) { | |
self_t *self = *self_p; | |
assert (s_self_is (self)); | |
self->tag = 0xDeadBeef; | |
zconfig_destroy (&self->users); | |
zstr_free (&self->not_authed); | |
zarmour_destroy (&self->base64); | |
zloop_destroy (&self->reactor); | |
#if defined (__UTYPE_LINUX) | |
if (self->fd > 0 && self->wd > 0) | |
self->wd = inotify_rm_watch (self->fd, self->wd); | |
if (self->fd > 0) | |
self->fd = close (self->fd); | |
#endif | |
zsock_destroy (&self->webserver); | |
self->port = 0; | |
zsock_set_sndtimeo (self->pipe, 0); | |
(void) zsock_signal (self->pipe, 0); | |
free (self); | |
*self_p = NULL; | |
} | |
} | |
static int | |
s_auth_basic (self_t *self, const char *auth) | |
{ | |
assert (self); | |
assert (s_self_is (self)); | |
assert (auth); | |
int rc = -1; | |
size_t decoded_size; | |
char *decoded = (char *) zarmour_decode (self->base64, auth, &decoded_size); | |
if (decoded) { | |
char *user = strsep (&decoded, ":"); | |
char *passwd = strsep (&decoded, ":"); | |
if (user && passwd) { | |
char *path = zsys_sprintf ("/users/%s/passwd", user); | |
if (path) { | |
char *str = zconfig_resolve (self->users, path, NULL); | |
zstr_free (&path); | |
if (str) | |
rc = crypto_pwhash_scryptsalsa208sha256_str_verify( | |
str, passwd, strlen (passwd)); | |
} | |
} | |
zstr_free (&decoded); | |
} | |
return rc; | |
} | |
static int | |
s_self_handle_webserver (zloop_t *reactor, zsock_t *webserver, void *arg) | |
{ | |
self_t *self = (self_t *) arg; | |
assert (self); | |
assert (s_self_is (self)); | |
zframe_t *peer_frame = zframe_recv (webserver); | |
if (!peer_frame) | |
return -1; | |
zframe_t *request = zframe_recv (webserver); | |
if (!request) { | |
zframe_destroy (&peer_frame); | |
return -1; | |
} | |
if (zframe_size (request) == 0) { | |
zframe_destroy (&peer_frame); | |
zframe_destroy (&request); | |
return 0; | |
} | |
const char* method; | |
size_t method_len; | |
const char* path; | |
size_t path_len; | |
int minor_version; | |
struct phr_header headers[MAX_HEADERS]; | |
size_t num_headers = MAX_HEADERS; | |
int rc = phr_parse_request ((const char *) zframe_data (request), | |
zframe_size (request), &method, &method_len, | |
&path, &path_len, &minor_version, | |
headers, &num_headers, 0); | |
if (rc > 0) { | |
if (strrcmp (method, method_len, "GET")) { | |
char *auth_header = NULL; | |
bool has_auth_header = false; | |
for (int i = 0; i < num_headers; ++i) { | |
if (strrcmp (headers[i].name, headers[i].name_len, "Authorization") && | |
headers[i].value_len >= 10) { | |
auth_header = strndup (headers[i].value, headers[i].value_len); | |
has_auth_header = true; | |
} | |
} | |
if (has_auth_header) { | |
if (auth_header) { | |
if (memcmp (auth_header, "Basic ", 6) == 0) { | |
if (s_auth_basic (self, auth_header + 6) == 0) | |
zframe_reset (request, OKAY, strlen (OKAY)); | |
else | |
zframe_reset (request, self->not_authed, strlen (self->not_authed)); | |
} | |
else | |
zframe_reset (request, self->not_authed, strlen (self->not_authed)); | |
zstr_free (&auth_header); | |
} | |
else | |
zframe_reset (request, SERVICE_UNAVAILABLE, strlen (SERVICE_UNAVAILABLE)); | |
} | |
else | |
zframe_reset (request, self->not_authed, strlen (self->not_authed)); | |
} | |
else | |
zframe_reset (request, METHOD_NOT_ALLOWED, strlen (METHOD_NOT_ALLOWED)); | |
} | |
else | |
zframe_reset (request, MALFORMED_REQUEST, strlen (MALFORMED_REQUEST)); | |
rc = zframe_send (&peer_frame, webserver, ZFRAME_MORE + ZFRAME_REUSE); | |
rc = zframe_send (&request, webserver, ZFRAME_MORE); | |
int sndtimeo = zsock_sndtimeo (webserver); | |
zsock_set_sndtimeo (webserver, 0); | |
rc = zframe_send (&peer_frame, webserver, ZFRAME_MORE); | |
rc = zstr_sendm (webserver, ""); | |
zsock_set_sndtimeo (webserver, sndtimeo); | |
return 0; | |
} | |
static int | |
s_self_bind (self_t *self, zmsg_t *request) | |
{ | |
int rc = -1; | |
if (!self->webserver) { | |
self->webserver = zsock_new (ZMQ_STREAM); | |
if (self->webserver) { | |
rc = zloop_reader ( self->reactor, | |
self->webserver, | |
s_self_handle_webserver, | |
self); | |
} | |
} | |
else | |
rc = 0; | |
if (rc == 0) { | |
char *endpoint = zmsg_popstr (request); | |
if (endpoint) { | |
rc = zsock_bind (self->webserver, "%s", endpoint); | |
if (rc != -1) { | |
self->port = rc; | |
rc = 0; | |
} | |
zstr_free (&endpoint); | |
} | |
else | |
rc = -1; | |
} | |
return rc; | |
} | |
static int | |
s_self_handle_pipe (zloop_t *reactor, zsock_t *pipe, void *arg) | |
{ | |
self_t *self = (self_t *) arg; | |
assert (self); | |
assert (s_self_is (self)); | |
zmsg_t *request = zmsg_recv (pipe); | |
if (!request) | |
return -1; | |
int rc = -1; | |
char *command = zmsg_popstr (request); | |
if (command) { | |
if (self->verbose) | |
zsys_info ("API command=%s", command); | |
if (streq (command, "$TERM")) { | |
s_self_destroy (&self); | |
rc = 0; | |
} | |
else | |
if (streq (command, "VERBOSE")) { | |
self->verbose = true; | |
zloop_set_verbose (self->reactor, true); | |
rc = 0; | |
} | |
else | |
if (streq (command, "QUITE")) { | |
self->verbose = false; | |
zloop_set_verbose (self->reactor, false); | |
rc = 0; | |
} | |
else | |
if (streq (command, "REALM")) { | |
char *realm = zmsg_popstr (request); | |
if (realm) { | |
zstr_free (&self->not_authed); | |
self->not_authed = zsys_sprintf (NOT_AUTHORIZED, realm); | |
if (self->not_authed) | |
rc = 0; | |
zstr_free (&realm); | |
} | |
} | |
else | |
if (streq (command, "BIND")) | |
rc = s_self_bind (self, request); | |
else | |
if (streq (command, "UNBIND")) { | |
if (self->webserver) { | |
char *endpoint = zmsg_popstr (request); | |
if (endpoint) { | |
rc = zsock_unbind (self->webserver, "%s", endpoint); | |
zstr_free (&endpoint); | |
} | |
} | |
} | |
else | |
if (streq (command, "PORT")) | |
rc = zstr_sendf (pipe, "%d", self->port); | |
else | |
if (streq (command, "LAST_ENDPOINT")) { | |
char *endpoint = zsock_last_endpoint (self->webserver); | |
if (endpoint) { | |
rc = zstr_send (pipe, endpoint); | |
zstr_free (&endpoint); | |
} | |
else { | |
rc = zstr_send (pipe, ""); | |
rc = -1; | |
} | |
} | |
else { | |
zsys_error ("invalid command: %s", command); | |
assert (false); | |
} | |
zstr_free (&command); | |
} | |
zmsg_destroy (&request); | |
return zsock_signal (pipe, rc); | |
} | |
#if defined (__UTYPE_LINUX) | |
static int | |
s_self_handle_inotify (zloop_t *loop, zmq_pollitem_t *item, void *arg) | |
{ | |
if ((item->revents & ZMQ_POLLERR) > 0) | |
return -1; | |
self_t *self = (self_t *) arg; | |
assert (self); | |
assert (s_self_is (self)); | |
char buffer[INOTIFY_BUFF_SIZE]; | |
ssize_t length = read (item->fd, buffer, INOTIFY_BUFF_SIZE); | |
if (length <= 0) | |
return -1; | |
return zconfig_reload (&self->users); | |
} | |
#endif | |
static self_t * | |
s_self_new (zsock_t *pipe, const char *filename) | |
{ | |
assert (pipe); | |
assert (zsock_is (pipe)); | |
assert (filename); | |
self_t *self = (self_t *) zmalloc (sizeof (self_t)); | |
if (!self) | |
return NULL; | |
int rc = -1; | |
self->tag = S_SELF_TAG; | |
self->pipe = pipe; | |
self->users = zconfig_load (filename); | |
if (self->users) | |
self->not_authed = zsys_sprintf (NOT_AUTHORIZED, "DeadBeef"); | |
if (self->not_authed) | |
self->base64 = zarmour_new (); | |
if (self->base64) | |
self->reactor = zloop_new (); | |
if (self->reactor) | |
rc = zloop_reader (self->reactor, pipe, s_self_handle_pipe, self); | |
#if defined (__UTYPE_LINUX) | |
if (rc != -1) | |
rc = self->fd = inotify_init1 (IN_NONBLOCK); | |
if (rc != -1) | |
rc = self->wd = inotify_add_watch (self->fd, zconfig_filename (self->users), IN_CREATE | IN_MODIFY); | |
if (rc != -1) { | |
zmq_pollitem_t pollitem = { NULL, self->fd, ZMQ_POLLIN, 0 }; | |
rc = zloop_poller (self->reactor, &pollitem, s_self_handle_inotify, self); | |
} | |
#endif | |
if (rc != -1) { | |
self->verbose = false; | |
self->webserver = NULL; | |
self->port = 0; | |
} | |
else | |
s_self_destroy(&self); | |
return self; | |
} | |
void | |
http_auth (zsock_t *pipe, void *filename) | |
{ | |
assert (pipe); | |
assert (zsock_is (pipe)); | |
assert (filename); | |
int rc = sodium_init (); | |
if (rc != -1) { | |
self_t *self = s_self_new (pipe, (const char *) filename); | |
if (self) { | |
zsock_signal (pipe, 0); | |
zloop_start (self->reactor); | |
s_self_destroy (&self); | |
} | |
} | |
} | |
int main (void) | |
{ | |
int rc = -1; | |
zactor_t *auth = zactor_new (http_auth, "foo.txt"); | |
if (auth) { | |
rc = zstr_send (auth, "VERBOSE"); | |
if (rc == 0) | |
rc = zsock_wait (auth); | |
if (rc == 0) | |
rc = zstr_sendx (auth, "BIND", "tcp://127.0.0.1:8087", NULL); | |
if (rc == 0) | |
rc = zsock_wait (auth); | |
if (rc == -1) | |
return rc; | |
void *sock = zactor_resolve (auth); | |
while (!zsys_interrupted) { | |
if (zsocket_poll (sock, -1)) { | |
zmsg_t *msg = zmsg_recv (auth); | |
if (msg) | |
zmsg_destroy (&msg); | |
else | |
break; | |
} | |
else | |
break; | |
} | |
zactor_destroy (&auth); | |
} | |
return rc; | |
} | |
#if defined (__UTYPE_LINUX) | |
#undef INOTIFY_EVENT_SIZE | |
#undef INOTIFY_BUFF_SIZE | |
#endif | |
#undef S_SELF_TAG | |
#undef MAX_HEADERS | |
#undef MALFORMED_REQUEST | |
#undef METHOD_NOT_ALLOWED | |
#undef SERVICE_UNAVAILABLE | |
#undef NOT_AUTHORIZED | |
#undef OKAY |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
hallo | |
*/ | |
#ifdef __cplusplus | |
extern "C" { | |
#endif | |
CZMQ_EXPORT void | |
http_auth (zsock_t *pipe, void *filename); | |
#ifdef __cplusplus | |
} | |
#endif |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
hallo | |
*/ | |
#include <czmq.h> | |
#include "picohttpparser/picohttpparser.h" | |
#include <wslay/wslay.h> | |
#define S_SELF_TAG 0x2314cafe | |
#define S_PEER_TAG 0x2342cafe | |
#define MALFORMED_REQUEST "HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\n" | |
#define METHOD_NOT_ALLOWED "HTTP/1.1 405 Method not allowed\r\nConnection: close\r\n\r\n" | |
#define SERVICE_UNAVAILABLE "HTTP/1.1 503 Service Unavailable\r\nConnection: close\r\n\r\n" | |
#define UPGRADE_REQUIRED "HTTP/1.1 426 Upgrade required\r\nUpgrade: websocket\r\nConnection: Upgrade\r\n\r\n" | |
#define WS_GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" | |
#define WS_UPGRADE "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: %s\r\n\r\n" | |
#define MAX_HEADERS 20 | |
static bool | |
strrcmp(const char* s, size_t l, const char* t) | |
{ | |
return strlen(t) == l && memcmp(s, t, l) == 0; | |
} | |
typedef struct { | |
uint32_t tag; | |
bool verbose; | |
bool running; | |
zsock_t *pipe; | |
zloop_t *reactor; | |
zsock_t *webserver; | |
int port; | |
zhashx_t *peers; | |
struct wslay_event_callbacks wslay_cbs; | |
} self_t; | |
typedef struct { | |
uint32_t tag; | |
zframe_t *peer_frame; | |
char *peer_id_hex; | |
bool accepted; | |
int64_t latency; | |
wslay_event_context_ptr wslay_ctx; | |
self_t *self; | |
void *ticket; | |
byte *buffer; | |
size_t buffer_size; | |
} peer_t; | |
static bool | |
s_self_is (void *self) | |
{ | |
assert (self); | |
return ((self_t *) self)->tag == S_SELF_TAG; | |
} | |
static void | |
s_self_destroy (self_t **self_p) | |
{ | |
assert (self_p); | |
if (*self_p) { | |
self_t *self = *self_p; | |
assert (s_self_is (self)); | |
zhashx_destroy (&self->peers); | |
zloop_destroy (&self->reactor); | |
zsock_destroy (&self->webserver); | |
self->tag = 0xDeadBeef; | |
zsock_signal (self->pipe, 0); | |
free (self); | |
*self_p = NULL; | |
} | |
} | |
static bool | |
s_peer_is (void *peer) | |
{ | |
assert (peer); | |
return ((peer_t *) peer)->tag == S_PEER_TAG; | |
} | |
static void | |
s_peer_destroy (peer_t **peer_p) | |
{ | |
assert (peer_p); | |
if (*peer_p) { | |
peer_t *peer = *peer_p; | |
assert (s_peer_is (peer)); | |
self_t *self = peer->self; | |
assert (self); | |
assert (s_self_is (self)); | |
zframe_destroy (&peer->peer_frame); | |
zstr_free (&peer->peer_id_hex); | |
if (peer->ticket) { | |
zloop_ticket_delete (self->reactor, peer->ticket); | |
peer->ticket = NULL; | |
} | |
if (peer->wslay_ctx) { | |
wslay_event_context_free (peer->wslay_ctx); | |
peer->wslay_ctx = NULL; | |
} | |
peer->buffer = NULL; | |
peer->buffer_size = 0; | |
peer->tag = 0xDeadBeef; | |
free (peer); | |
*peer_p = NULL; | |
} | |
} | |
static void | |
s_peer_disconnect (peer_t *peer) | |
{ | |
assert (peer); | |
assert (s_peer_is (peer)); | |
self_t *self = peer->self; | |
assert (self); | |
assert (s_self_is (self)); | |
int sndtimeo = zsock_sndtimeo (self->webserver); | |
zsock_set_sndtimeo (self->webserver, 0); | |
(void) zframe_send (&peer->peer_frame, self->webserver, ZFRAME_MORE); | |
(void) zstr_sendm (self->webserver, ""); | |
zsock_set_sndtimeo (self->webserver, sndtimeo); | |
zhashx_delete (self->peers, peer->peer_id_hex); | |
} | |
static peer_t * | |
s_peer_new (self_t *self, zframe_t *peer_frame) | |
{ | |
assert (self); | |
assert (s_self_is (self)); | |
assert (peer_frame); | |
assert (zframe_is (peer_frame)); | |
peer_t *peer = (peer_t *) zmalloc (sizeof (peer_t)); | |
if (peer) { | |
peer->peer_frame = zframe_dup (peer_frame); | |
if (peer->peer_frame) { | |
peer->peer_id_hex = zframe_strhex (peer->peer_frame); | |
if (peer->peer_id_hex) { | |
peer->accepted = false; | |
peer->self = self; | |
peer->tag = S_PEER_TAG; | |
} | |
} | |
if (peer->tag != S_PEER_TAG) | |
s_peer_destroy (&peer); | |
} | |
return peer; | |
} | |
static int | |
s_peer_read (peer_t *peer, zframe_t *request) | |
{ | |
assert (peer); | |
assert (s_peer_is (peer)); | |
assert (request); | |
assert (zframe_is (request)); | |
peer->buffer = zframe_data (request); | |
peer->buffer_size = zframe_size (request); | |
int rc = 0; | |
if (wslay_event_want_read (peer->wslay_ctx)) | |
rc = wslay_event_recv (peer->wslay_ctx); | |
return rc; | |
} | |
static int | |
s_peer_write (peer_t *peer) | |
{ | |
assert (peer); | |
assert (s_peer_is (peer)); | |
int rc = 0; | |
while (wslay_event_want_write (peer->wslay_ctx)) { | |
rc = wslay_event_send (peer->wslay_ctx); | |
if (rc != 0) | |
break; | |
} | |
return rc; | |
} | |
static int | |
s_peer_timeout (zloop_t *loop, int timer_id, void *arg) | |
{ | |
peer_t *peer = (peer_t *) arg; | |
assert (peer); | |
assert (s_peer_is (peer)); | |
s_peer_disconnect (peer); | |
return 0; | |
} | |
static int | |
s_write (peer_t *peer, const uint8_t *data, size_t len) | |
{ | |
assert (peer); | |
assert (s_peer_is (peer)); | |
assert (data); | |
self_t *self = peer->self; | |
assert (self); | |
assert (s_self_is (self)); | |
int rc = zframe_send (&peer->peer_frame, self->webserver, ZFRAME_MORE | ZFRAME_REUSE); | |
if (rc == 0) { | |
zframe_t *msg = zframe_new (data, len); | |
if (msg) | |
rc = zframe_send (&msg, self->webserver, ZFRAME_MORE); | |
else | |
rc = -1; | |
} | |
return rc; | |
} | |
static ssize_t | |
s_wslay_recv_cb (wslay_event_context_ptr ctx, uint8_t *buf, size_t len, int flags, void *user_data) | |
{ | |
peer_t *peer = (peer_t *) user_data; | |
assert (peer); | |
assert (s_peer_is (peer)); | |
if (peer->buffer) { | |
if (peer->buffer_size > len) { | |
memmove (buf, peer->buffer, len); | |
peer->buffer_size -= len; | |
peer->buffer += len; | |
return len; | |
} | |
else { | |
memmove (buf, peer->buffer, peer->buffer_size); | |
int buffer_size = peer->buffer_size; | |
peer->buffer = NULL; | |
peer->buffer_size = 0; | |
return buffer_size; | |
} | |
} | |
else { | |
wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK); | |
return -1; | |
} | |
} | |
static ssize_t | |
s_wslay_send_cb (wslay_event_context_ptr ctx, const uint8_t *data, size_t len, int flags, void *user_data) | |
{ | |
peer_t *peer = (peer_t *) user_data; | |
assert (peer); | |
assert (s_peer_is (peer)); | |
int rc = s_write (peer, data, len); | |
if (rc == -1) { | |
if (zmq_errno () == EAGAIN) | |
wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK); | |
else | |
wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE); | |
return rc; | |
} | |
return len; | |
} | |
static void | |
s_wslay_on_msg_recv_cb (wslay_event_context_ptr ctx, const struct wslay_event_on_msg_recv_arg *arg, void *user_data) | |
{ | |
peer_t *peer = (peer_t *) user_data; | |
assert (peer); | |
assert (s_peer_is (peer)); | |
self_t *self = peer->self; | |
assert (self); | |
assert (s_self_is (self)); | |
zloop_ticket_reset (self->reactor, peer->ticket); | |
if (wslay_is_ctrl_frame (arg->opcode)) { | |
if (arg->opcode == WSLAY_PONG) | |
peer->latency = zclock_usecs () - peer->latency; | |
} | |
else { | |
struct wslay_event_msg msgarg = { | |
arg->opcode, arg->msg, arg->msg_length | |
}; | |
wslay_event_queue_msg (ctx, &msgarg); | |
(void) zsock_bsend (self->pipe, "s811S82", peer->peer_id_hex, | |
peer->latency, arg->rsv, | |
arg->opcode, arg->msg, | |
arg->msg_length, | |
arg->status_code); | |
} | |
} | |
static int | |
create_accept_key(char *dst, const char *peer_key) | |
{ | |
assert (dst); | |
assert (peer_key); | |
int rc = -1; | |
byte key_src [60]; | |
memmove (key_src, peer_key, 24); | |
memmove (key_src+24, WS_GUID, 36); | |
zdigest_t *digest = zdigest_new (); | |
if (digest) { | |
zdigest_update (digest, key_src, sizeof (key_src)); | |
zarmour_t *zarmour = zarmour_new (); | |
if (zarmour) { | |
char *encoded = zarmour_encode (zarmour, zdigest_data (digest), zdigest_size (digest)); | |
zarmour_destroy (&zarmour); | |
if (encoded) { | |
strcpy (dst, encoded); | |
zstr_free (&encoded); | |
rc = 0; | |
} | |
} | |
zdigest_destroy (&digest); | |
} | |
return rc; | |
} | |
static int | |
s_websocket_handshake (peer_t *peer, zframe_t *request) | |
{ | |
assert (peer); | |
assert (s_peer_is (peer)); | |
assert (request); | |
assert (zframe_is (request)); | |
const char* method; | |
size_t method_len; | |
const char* path; | |
size_t path_len; | |
int minor_version; | |
struct phr_header headers[MAX_HEADERS]; | |
size_t num_headers = MAX_HEADERS; | |
int rc = phr_parse_request ((const char *) zframe_data (request), | |
zframe_size (request), &method, &method_len, | |
&path, &path_len, &minor_version, | |
headers, &num_headers, 0); | |
if (rc > 0) { | |
if (strrcmp (method, method_len, "GET")) { | |
bool is_websocket = false; | |
bool has_ws_ver = false; | |
const char *secWebSocketKey = NULL; | |
for (int i = 0; i < num_headers; ++i) { | |
if (strrcmp (headers[i].name, headers[i].name_len, "Upgrade") && | |
(strrcmp (headers[i].value, headers[i].value_len, "websocket") || | |
strrcmp (headers[i].value, headers[i].value_len, "WebSocket"))) | |
is_websocket = true; | |
if (strrcmp (headers[i].name, headers[i].name_len, "Sec-WebSocket-Version") && | |
strrcmp (headers[i].value, headers[i].value_len, "13")) | |
has_ws_ver = true; | |
if (strrcmp (headers[i].name, headers[i].name_len, "Sec-WebSocket-Key") && | |
headers[i].value_len == 24) | |
secWebSocketKey = headers[i].value; | |
} | |
if (is_websocket && has_ws_ver && secWebSocketKey) { | |
char accept_key [29]; | |
rc = create_accept_key (accept_key, secWebSocketKey); | |
if (rc == 0) { | |
char *handshake = zsys_sprintf (WS_UPGRADE, accept_key); | |
if (handshake) { | |
rc = s_write (peer, (const unsigned char *) handshake, strlen (handshake)); | |
zstr_free (&handshake); | |
if (rc == 0) { | |
self_t *self = peer->self; | |
assert (self); | |
assert (s_self_is (self)); | |
rc = wslay_event_context_server_init(&peer->wslay_ctx, &self->wslay_cbs, peer); | |
if (rc == 0) | |
peer->ticket = zloop_ticket (self->reactor, (zloop_timer_fn *) s_peer_timeout, peer); | |
if (peer->ticket) | |
peer->accepted = true; | |
else { | |
rc = s_write (peer, (const unsigned char *) SERVICE_UNAVAILABLE, strlen (SERVICE_UNAVAILABLE)); | |
rc = -1; | |
} | |
} | |
} | |
else { | |
rc = s_write (peer, (const unsigned char *) SERVICE_UNAVAILABLE, strlen (SERVICE_UNAVAILABLE)); | |
rc = -1; | |
} | |
} | |
else { | |
rc = s_write (peer, (const unsigned char *) SERVICE_UNAVAILABLE, strlen (SERVICE_UNAVAILABLE)); | |
rc = -1; | |
} | |
} | |
else { | |
rc = s_write (peer, (const unsigned char *) UPGRADE_REQUIRED, strlen (UPGRADE_REQUIRED)); | |
rc = -1; | |
} | |
} | |
else { | |
rc = s_write (peer, (const unsigned char *) METHOD_NOT_ALLOWED, strlen (METHOD_NOT_ALLOWED)); | |
rc = -1; | |
} | |
} | |
else { | |
rc = s_write (peer, (const unsigned char *) MALFORMED_REQUEST, strlen (MALFORMED_REQUEST)); | |
rc = -1; | |
} | |
return rc; | |
} | |
static int | |
s_self_latency (zloop_t *loop, int timer_id, void *arg) | |
{ | |
self_t *self = (self_t *) arg; | |
assert (self); | |
assert (s_self_is (self)); | |
peer_t *peer = (peer_t *) zhashx_first (self->peers); | |
struct wslay_event_msg msgarg = { | |
WSLAY_PING, NULL, 0 | |
}; | |
int rc = 0; | |
while (peer && peer->wslay_ctx) { | |
assert (s_peer_is (peer)); | |
rc = wslay_event_queue_msg (peer->wslay_ctx, &msgarg); | |
if (rc == 0) { | |
rc = s_peer_write (peer); | |
if (rc == 0) | |
peer->latency = zclock_usecs (); | |
} | |
peer = (peer_t *) zhashx_next (self->peers); | |
} | |
return 0; | |
} | |
static int | |
s_self_handle_webserver (zloop_t *reactor, zsock_t *webserver, void *arg) | |
{ | |
zframe_t *peer_frame = zframe_recv (webserver); | |
if (!peer_frame) | |
return -1; | |
zframe_t *request = zframe_recv (webserver); | |
if (!request) { | |
zframe_destroy (&peer_frame); | |
return -1; | |
} | |
char *peer_id_hex = zframe_strhex (peer_frame); | |
if (peer_id_hex) { | |
self_t *self = (self_t *) arg; | |
assert (self); | |
assert (s_self_is (self)); | |
peer_t *peer = NULL; | |
int rc = -1; | |
if (zframe_size (request) == 0) { | |
peer = (peer_t *) zhashx_lookup (self->peers, peer_id_hex); | |
if (peer) { | |
zhashx_delete (self->peers, peer_id_hex); | |
rc = 0; | |
} | |
else { | |
peer = s_peer_new (self, peer_frame); | |
if (peer) | |
rc = zhashx_insert (self->peers, peer_id_hex, peer); | |
else { | |
int sndtimeo = zsock_sndtimeo (webserver); | |
zsock_set_sndtimeo (webserver, 0); | |
(void) zframe_send (&peer_frame, webserver, ZFRAME_MORE); | |
(void) zstr_sendm (webserver, ""); | |
zsock_set_sndtimeo (webserver, sndtimeo); | |
} | |
} | |
} | |
else { | |
peer = (peer_t *) zhashx_lookup (self->peers, peer_id_hex); | |
assert (peer); | |
assert (s_peer_is (peer)); | |
if (peer->accepted) { | |
rc = s_peer_read (peer, request); | |
if (rc == 0) { | |
if (wslay_event_get_close_received (peer->wslay_ctx)) | |
s_peer_disconnect (peer); | |
else { | |
rc = s_peer_write (peer); | |
if (rc == 0) { | |
if (wslay_event_get_close_sent (peer->wslay_ctx)) | |
s_peer_disconnect (peer); | |
} | |
} | |
} | |
} | |
else | |
rc = s_websocket_handshake (peer, request); | |
} | |
if (rc == -1) | |
s_peer_disconnect (peer); | |
zstr_free (&peer_id_hex); | |
} | |
else { | |
int sndtimeo = zsock_sndtimeo (webserver); | |
zsock_set_sndtimeo (webserver, 0); | |
(void) zframe_send (&peer_frame, webserver, ZFRAME_MORE); | |
(void) zstr_sendm (webserver, ""); | |
zsock_set_sndtimeo (webserver, sndtimeo); | |
} | |
zframe_destroy (&request); | |
zframe_destroy (&peer_frame); | |
return 0; | |
} | |
static int | |
s_self_start (self_t *self) | |
{ | |
assert (self); | |
assert (s_self_is (self)); | |
int rc = 0; | |
if (!self->running) { | |
rc = zloop_reader (self->reactor, self->webserver, (zloop_reader_fn *) s_self_handle_webserver, self); | |
if (rc != -1) | |
rc = zloop_timer (self->reactor, 4999, 0, (zloop_timer_fn *) s_self_latency, self); | |
if (rc != -1) { | |
rc = 0; | |
self->running = true; | |
} | |
else | |
zloop_reader_end (self->reactor, self->webserver); | |
} | |
return rc; | |
} | |
static int | |
s_self_bind (self_t *self, const char *endpoint) | |
{ | |
assert (self); | |
assert (s_self_is (self)); | |
assert (endpoint); | |
int port = zsock_bind (self->webserver, "%s", endpoint); | |
if (port != -1) { | |
self->port = port; | |
return 0; | |
} | |
return port; | |
} | |
static int | |
s_self_unbind (self_t *self, const char *endpoint) | |
{ | |
assert (self); | |
assert (s_self_is (self)); | |
assert (endpoint); | |
return zsock_unbind (self->webserver, "%s", endpoint); | |
} | |
static int | |
s_self_handle_pipe (zloop_t *reactor, zsock_t *pipe, void *arg) | |
{ | |
zmsg_t *request = zmsg_recv (pipe); | |
if (!request) | |
return -1; | |
int rc = -1; | |
char *command = zmsg_popstr (request); | |
if (command) { | |
self_t *self = (self_t *) arg; | |
assert (self); | |
assert (s_self_is (self)); | |
if (self->verbose) | |
zsys_info ("API command=%s", command); | |
if (streq (command, "$TERM")) { | |
s_self_destroy (&self); | |
rc = 0; | |
} | |
else | |
if (streq (command, "START")) | |
rc = s_self_start (self); | |
else | |
if (streq (command, "VERBOSE")) { | |
self->verbose = true; | |
zloop_set_verbose (reactor, true); | |
rc = 0; | |
} | |
else | |
if (streq (command, "BIND")) { | |
char *endpoint = zmsg_popstr (request); | |
if (endpoint) { | |
rc = s_self_bind (self, endpoint); | |
zstr_free (&endpoint); | |
} | |
} | |
else | |
if (streq (command, "UNBIND")) { | |
char *endpoint = zmsg_popstr (request); | |
if (endpoint) { | |
rc = s_self_unbind (self, endpoint); | |
zstr_free (&endpoint); | |
} | |
} | |
else | |
if (streq (command, "SEND_STR")) { | |
char *peer_id_hex = zmsg_popstr (request); | |
if (peer_id_hex) { | |
peer_t *peer = (peer_t *) zhashx_lookup (self->peers, peer_id_hex); | |
zstr_free (&peer_id_hex); | |
assert (peer); | |
assert (s_peer_is (peer)); | |
zframe_t *msg = zmsg_pop (request); | |
struct wslay_event_msg msgarg = { | |
WSLAY_TEXT_FRAME, zframe_data (msg), zframe_size (msg) | |
}; | |
zframe_destroy (&msg); | |
rc = wslay_event_queue_msg (peer->wslay_ctx, &msgarg); | |
if (rc == 0) { | |
rc = s_peer_write (peer); | |
if (rc != 0) | |
s_peer_disconnect (peer); | |
} | |
else | |
s_peer_disconnect (peer); | |
} | |
} | |
else { | |
zsys_error ("invalid command: %s", command); | |
assert (false); | |
} | |
zstr_free (&command); | |
} | |
zmsg_destroy (&request); | |
return zsock_signal (pipe, rc); | |
} | |
static self_t * | |
s_self_new (zsock_t *pipe) | |
{ | |
assert (zsock_is (pipe)); | |
self_t *self = (self_t *) zmalloc (sizeof (self_t)); | |
if (!self) | |
return NULL; | |
self->verbose = false; | |
self->running = false; | |
self->pipe = pipe; | |
self->reactor = zloop_new (); | |
if (self->reactor) | |
self->webserver = zsock_new (ZMQ_STREAM); | |
self->port = 0; | |
if (self->webserver) | |
self->peers = zhashx_new (); | |
if (!self->peers) { | |
s_self_destroy (&self); | |
return NULL; | |
} | |
zhashx_set_destructor (self->peers, (czmq_destructor *) s_peer_destroy); | |
int rc = zloop_reader (self->reactor, self->pipe, (zloop_reader_fn *) s_self_handle_pipe, self); | |
if (rc == -1) { | |
s_self_destroy (&self); | |
return NULL; | |
} | |
zloop_set_ticket_delay (self->reactor, 45000); | |
self->wslay_cbs.recv_callback = (wslay_event_recv_callback) s_wslay_recv_cb; | |
self->wslay_cbs.send_callback = (wslay_event_send_callback) s_wslay_send_cb; | |
self->wslay_cbs.on_msg_recv_callback = (wslay_event_on_msg_recv_callback) s_wslay_on_msg_recv_cb; | |
self->tag = S_SELF_TAG; | |
return self; | |
} | |
void | |
zwebsock (zsock_t *pipe, void *unused) | |
{ | |
assert (pipe); | |
assert (zsock_is (pipe)); | |
assert (unused == NULL); | |
self_t *self = s_self_new (pipe); | |
if (self) { | |
zsock_signal (self->pipe, 0); | |
zloop_start (self->reactor); | |
s_self_destroy (&self); | |
} | |
} | |
int main(void) | |
{ | |
int rc = -1; | |
zsys_set_ipv6 (1); | |
zsys_set_logident ("zwebsock"); | |
zactor_t *websock = zactor_new (zwebsock, NULL); | |
assert (websock); | |
rc = zstr_send (websock, "VERBOSE"); | |
if (rc == 0) | |
rc = zsock_wait (websock); | |
if (rc == 0) | |
rc = zstr_sendx (websock, "BIND", "tcp://127.0.0.1:9001", NULL); | |
if (rc == 0) | |
rc = zsock_wait (websock); | |
if (rc == 0) | |
rc = zstr_send (websock, "START"); | |
if (rc == 0) | |
rc = zsock_wait (websock); | |
if (rc == -1) | |
goto fail; | |
void *sock = zactor_resolve (websock); | |
char *peer_id_hex; | |
int64_t latency; | |
uint8_t rsv; | |
uint8_t opcode; | |
uint8_t *msg; | |
size_t msg_length; | |
uint16_t status_code; | |
while (!zsys_interrupted) { | |
if (zsocket_poll (sock, -1)) { | |
rc = zsock_brecv (websock, "s811S82", &peer_id_hex, &latency, &rsv, &opcode, &msg, &msg_length, &status_code); | |
zsys_info ("recv: peer_id_hex: %s latency: %jd rsv: %u opcode: %u msg: %s length: %zu status_code: %u", peer_id_hex, latency, rsv, opcode, msg, msg_length, status_code); | |
free (msg); | |
} | |
else | |
break; | |
} | |
fail: | |
zactor_destroy (&websock); | |
return rc; | |
} | |
#undef S_SELF_TAG | |
#undef S_PEER_TAG | |
#undef MALFORMED_REQUEST | |
#undef METHOD_NOT_ALLOWED | |
#undef SERVICE_UNAVAILABLE | |
#undef UPGRADE_REQUIRED | |
#undef WS_GUID | |
#undef WS_UPGRADE | |
#undef MAX_HEADERS |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'bundler/setup' | |
require 'czmq' | |
require 'http/parser' | |
require 'digest/sha1' | |
require 'base64' | |
require 'wslay' | |
require 'hitimes' | |
module Zwebsock | |
module Helpers | |
module_function | |
def destroy_zframe(zframe) | |
FFI::MemoryPointer.new(:pointer) do |zframe_ptr| | |
zframe_ptr.write_pointer(zframe) | |
CZMQ::Zframe.destructor(zframe_ptr) | |
end | |
end | |
def read_zframe(zframe) | |
CZMQ::Zframe.data(zframe).read_bytes(CZMQ::Zframe.size(zframe)) | |
end | |
def recv_zframe(src) | |
if (zframe = CZMQ::Zframe.recv_zframe(src)) | |
zframe | |
else | |
raise IOError, CZMQ::Utils.error, caller | |
end | |
end | |
def send_zframe(zframe, dest, flags) | |
FFI::MemoryPointer.new(:pointer) do |zframe_ptr| | |
zframe_ptr.write_pointer(zframe) | |
unless (CZMQ::Zframe.send_zframe(zframe_ptr, dest, flags)).zero? | |
unless CZMQ::Zsys.interrupted | |
raise IOError, CZMQ::Utils.error, caller | |
end | |
end | |
end | |
end | |
def new_zframe(data, len) | |
if (zframe = CZMQ::Zframe.constructor(data, len)) | |
zframe | |
else | |
raise NoMemoryError, CZMQ::Utils.error, caller | |
end | |
end | |
def hex_zframe(zframe) | |
if (hex_frame = CZMQ::Zframe.strhex(zframe)) | |
CZMQ::Zstr.read_string(hex_frame) | |
else | |
raise NoMemoryError, CZMQ::Utils.error, caller | |
end | |
end | |
end | |
end | |
module Zwebsock | |
class WslayError < StandardError; end | |
end | |
module Zwebsock | |
class Peer | |
attr_reader :peer_id, :peer_id_hex | |
attr_accessor :accepted, :latency, :wslay_ctx, :buffer, :buffer_size | |
def initialize(peer_id, peer_id_hex) | |
@peer_id = peer_id | |
@peer_id_hex = peer_id_hex | |
end | |
def tell(data) | |
case data | |
when String | |
case data.encoding | |
when Encoding::UTF_8 | |
FFI::MemoryPointer.new(:uint8, data.bytesize) do |msg| | |
msg.write_bytes(data) | |
send_data(Wslay::OpCode[:text_frame], msg, msg.size) | |
end | |
when Encoding::ASCII_8BIT | |
FFI::MemoryPointer.new(:uint8, data.bytesize) do |msg| | |
msg.write_bytes(data) | |
send_data(Wslay::OpCode[:binary_frame], msg, msg.size) | |
end | |
else | |
fail ArgumentError, "Data String must be in UTF-8 or binary encoding" | |
end | |
when Array | |
FFI::MemoryPointer.new(:uint8, data.size) do |msg| | |
msg.write_array_of_bytes(data) | |
send_data(Wslay::OpCode[:binary_frame], msg, msg.size) | |
end | |
when FFI::Pointer | |
send_data(Wslay::OpCode[:binary_frame], data, data.size) | |
else | |
fail ArgumentError, "Data is neither a String, Array, or FFI::Pointer" | |
end | |
self | |
end | |
alias_method :<<, :tell | |
def send_data(opcode, msg, msg_length) | |
response = Wslay::Event::Msg.new | |
response[:opcode] = opcode | |
response[:msg] = msg | |
response[:msg_length] = msg_length | |
if (rc = Wslay::Event.queue_msg(wslay_ctx, response)) == 0 | |
write | |
else | |
if rc == :nomem | |
raise NoMemoryError | |
else | |
raise WslayError, rc.to_s | |
end | |
end | |
end | |
def ping | |
msg = Wslay::Event::Msg.new | |
msg[:opcode] = Wslay::OpCode[:ping] | |
if (rc = Wslay::Event.queue_msg(wslay_ctx, msg)) == 0 | |
write | |
latency.start | |
0 | |
else | |
if rc == :nomem | |
raise NoMemoryError | |
else | |
raise WslayError, rc.to_s | |
end | |
end | |
end | |
def read(request) | |
rc = 0 | |
if Wslay::Event.want_read(wslay_ctx) | |
self.buffer = CZMQ::Zframe.data(request) | |
self.buffer_size = CZMQ::Zframe.size(request) | |
unless (rc = Wslay::Event.read(wslay_ctx)) == 0 | |
if rc == :nomem | |
raise NoMemoryError | |
else | |
raise WslayError, rc.to_s | |
end | |
end | |
end | |
rc | |
end | |
def write | |
rc = 0 | |
while Wslay::Event.want_write(wslay_ctx) | |
unless (rc = Wslay::Event.write(wslay_ctx)) == 0 | |
if rc == :nomem | |
raise NoMemoryError | |
else | |
raise WslayError, rc.to_s | |
end | |
end | |
end | |
rc | |
end | |
end | |
class Server | |
include Helpers | |
include CZMQ | |
extend Forwardable | |
GET = 'GET'.freeze | |
UPGRADE = 'Upgrade'.freeze | |
WEBSOCKET = 'websocket'.freeze | |
SEC_WEBSOCKET_KEY = 'Sec-WebSocket-Key'.freeze | |
SEC_WEBSOCKET_VERSION = 'Sec-WebSocket-Version'.freeze | |
CRLF = "\r\n".freeze | |
HANDSHAKE = "HTTP/1.1 101 Switching Protocols#{CRLF}#{UPGRADE}: #{WEBSOCKET}#{CRLF}Connection: #{UPGRADE}#{CRLF}Sec-WebSocket-Accept: %s#{CRLF * 2}".freeze | |
UPGRADE_REQUIRED = FFI::MemoryPointer.from_string("HTTP/1.1 426 #{UPGRADE} Required#{CRLF}#{UPGRADE}: #{WEBSOCKET}#{CRLF}Connection: #{UPGRADE}#{CRLF * 2}").freeze | |
EMPTY_STR = ''.freeze | |
def_delegators :@parent_pipe, :<<, :tell, :recv, :wait, :signal, :destructor | |
def initialize(endpoint, &callback) | |
@endpoint = endpoint | |
@callback = callback | |
@parent_pipe = Zactor.new_actor(&method(:run)) | |
at_exit { destructor } | |
end | |
private | |
def write(peer, data, len) | |
send_zframe(new_zframe(peer.peer_id, peer.peer_id.size), @server.to_zsock, Zframe::MORE) | |
send_zframe(new_zframe(data, len), @server.to_zsock, Zframe::MORE) | |
rescue IOError => e | |
if Utils.errno == Errno::EHOSTUNREACH::Errno | |
delete_peer(peer) | |
else | |
raise e | |
end | |
end | |
def disconnect(peer) | |
@server.set_sndtimeo(0) | |
send_zframe(new_zframe(peer.peer_id, peer.peer_id.size), @server.to_zsock, Zframe::MORE) | |
Zstr.sendm(@server.to_zsock, EMPTY_STR) | |
@server.set_sndtimeo(-1) | |
rescue IOError | |
nil | |
ensure | |
delete_peer(peer) | |
end | |
def delete_peer(peer) | |
if peer.wslay_ctx | |
peer.wslay_ctx = Wslay::Event::Context.free(peer.wslay_ctx) | |
end | |
@peers.delete(peer.peer_id_hex) | |
end | |
def run(child_pipe) | |
Thread.current.abort_on_exception = true | |
@child_pipe = child_pipe | |
@peers = {} | |
@reactor = Zloop.new | |
@reactor.add_reader(@child_pipe, &method(:handle_pipe)) | |
@server = Zsock.new(:stream) | |
@server.set_maxmsgsize(1024 * 1024 + 8192) | |
@port = @server.bind(@endpoint) | |
if Utils.version[:zmq][:major] >= 4 && Utils.version[:zmq][:minor] >= 1 | |
zloop_reader_fn = FFI::Function.new(:int, [:pointer, :pointer, :pointer], blocking: true) do |zloop_t, zsock_t, args| | |
handle_server_4_1(zsock_t) | |
end | |
else | |
zloop_reader_fn = FFI::Function.new(:int, [:pointer, :pointer, :pointer], blocking: true) do |zloop_t, zsock_t, args| | |
handle_server(zsock_t) | |
end | |
end | |
Zloop.reader(@reactor.to_zloop, @server.to_zsock, zloop_reader_fn, nil) | |
@reactor.add_timer(4999, 0, &method(:handle_pings)) | |
@wslay_callbacks = Wslay::Event::Callbacks.new | |
@wslay_callbacks.recv_callback(&method(:recv_callback)) | |
@wslay_callbacks.send_callback(&method(:send_callback)) | |
@wslay_callbacks.on_msg_recv_callback(&method(:on_msg_recv_callback)) | |
@parser = HTTP::Parser.new | |
@child_pipe.signal(0) | |
Zsys.info "zwebsock: - started on #{@endpoint}, port: #{@port}" | |
@reactor.start | |
rescue => e | |
Zsys.error(format_exception(e)) | |
@child_pipe.signal(-1) | |
end | |
def handle_pipe(zsock) | |
msg = zsock.recv | |
case msg.first.to_str | |
when '$TERM' | |
@peers.each_value do |peer| | |
disconnect(peer) | |
end | |
0 | |
when 'PORT' | |
Zstr.send_zstr(zsock.to_zsock, @port.to_s) | |
end | |
rescue => e | |
Zsys.error(format_exception(e)) | |
zsock.signal(-1) | |
-1 | |
end | |
def handle_server(zsock) | |
peer_frame = nil | |
request = nil | |
peer = nil | |
peer_frame = recv_zframe(zsock) | |
request = recv_zframe(zsock) | |
unless @peers.key?(peer_id_hex = hex_zframe(peer_frame)) | |
peer_id = FFI::MemoryPointer.new(:uchar, Zframe.size(peer_frame)) | |
peer_id.__copy_from__(Zframe.data(peer_frame), peer_id.size) | |
peer = @peers[peer_id_hex] = Peer.new(peer_id, peer_id_hex) | |
end | |
peer = @peers[peer_id_hex] | |
if peer.accepted | |
peer.read(request) | |
if Wslay::Event.get_close_received(peer.wslay_ctx) | |
disconnect(peer) | |
else | |
peer.write | |
if Wslay::Event.get_close_sent(peer.wslay_ctx) | |
disconnect(peer) | |
end | |
end | |
else | |
websocket_handshake(peer, request) | |
end | |
0 | |
rescue WslayError | |
disconnect(peer) if peer | |
0 | |
rescue => e | |
Zsys.error(format_exception(e)) | |
@child_pipe.signal(-1) | |
-1 | |
ensure | |
destroy_zframe(request) if request | |
destroy_zframe(peer_frame) if peer_frame | |
end | |
def handle_server_4_1(zsock) | |
peer_frame = nil | |
request = nil | |
peer = nil | |
peer_frame = recv_zframe(zsock) | |
peer_id_hex = hex_zframe(peer_frame) | |
request = recv_zframe(zsock) | |
if Zframe.size(request).zero? | |
if @peers.key?(peer_id_hex) | |
delete_peer(@peers[peer_id_hex]) | |
else | |
peer_id = FFI::MemoryPointer.new(:uchar, Zframe.size(peer_frame)) | |
peer_id.__copy_from__(Zframe.data(peer_frame), peer_id.size) | |
peer = @peers[peer_id_hex] = Peer.new(peer_id, peer_id_hex) | |
end | |
else | |
peer = @peers[peer_id_hex] | |
if peer.accepted | |
peer.read(request) | |
if Wslay::Event.get_close_received(peer.wslay_ctx) | |
disconnect(peer) | |
else | |
peer.write | |
if Wslay::Event.get_close_sent(peer.wslay_ctx) | |
disconnect(peer) | |
end | |
end | |
else | |
websocket_handshake(peer, request) | |
end | |
end | |
0 | |
rescue WslayError | |
disconnect(peer) if peer | |
0 | |
rescue => e | |
Zsys.error(format_exception(e)) | |
@child_pipe.signal(-1) | |
-1 | |
ensure | |
destroy_zframe(request) if request | |
destroy_zframe(peer_frame) if peer_frame | |
end | |
def handle_pings(timer_id) | |
peer = nil | |
@peers.each_value do |peer_handler| | |
peer = peer_handler | |
peer.ping | |
end | |
0 | |
rescue WslayError | |
disconnect(peer) if peer | |
0 | |
rescue => e | |
Zsys.error(format_exception(e)) | |
@child_pipe.signal(-1) | |
-1 | |
end | |
def websocket_handshake(peer, request) | |
@parser.reset! | |
@parser.on_headers_complete = proc do |headers| | |
if (@parser.http_method.casecmp(GET).zero? && | |
@parser.upgrade? && | |
headers[UPGRADE].casecmp(WEBSOCKET).zero? && | |
headers.key?(SEC_WEBSOCKET_KEY) && | |
headers[SEC_WEBSOCKET_KEY].bytesize == 24 && | |
headers[SEC_WEBSOCKET_VERSION].to_i == 13) | |
handshake = format(HANDSHAKE, create_accept_key(headers[SEC_WEBSOCKET_KEY])) | |
peer.latency = Hitimes::TimedMetric.new(:latency) | |
FFI::MemoryPointer.new(:pointer) do |wslay_ctx_ptr| | |
if (rc = Wslay::Event::Context.server_init(wslay_ctx_ptr, @wslay_callbacks, peer.peer_id_hex)) == 0 | |
wslay_ctx = wslay_ctx_ptr.read_pointer | |
Wslay::Event::Config.set_max_recv_msg_length(wslay_ctx, 1024 * 1024) | |
peer.wslay_ctx = wslay_ctx | |
write(peer, handshake, handshake.bytesize) | |
peer.accepted = true | |
else | |
if rc == :nomem | |
raise NoMemoryError | |
else | |
raise WslayError, rc.to_s | |
end | |
end | |
end | |
else | |
write(peer, UPGRADE_REQUIRED, UPGRADE_REQUIRED.size - 1) | |
disconnect(peer) | |
end | |
:stop | |
end | |
@parser << read_zframe(request) | |
end | |
def recv_callback(ctx, buf, len, flags, user_data) | |
peer = @peers[user_data.get_string(0)] | |
if (peer.buffer) | |
if peer.buffer_size > len | |
buf.__copy_from__(peer.buffer, len) | |
peer.buffer_size -= len | |
peer.buffer += len | |
len | |
else | |
buf.__copy_from__(peer.buffer, peer.buffer_size) | |
peer.buffer = nil | |
buffer_size = peer.buffer_size | |
peer.buffer_size = 0 | |
buffer_size | |
end | |
else | |
Wslay::Event.set_error(ctx, :would_block) | |
-1 | |
end | |
rescue => e | |
Zsys.error(format_exception(e)) | |
Wslay::Event.set_error(ctx, :callback_failure) | |
-1 | |
end | |
def send_callback(ctx, data, len, flags, user_data) | |
peer = @peers[user_data.get_string(0)] | |
write(peer, data, len) | |
len | |
rescue => e | |
if Utils.errno == Errno::EAGAIN::Errno | |
Wslay::Event.set_error(ctx, :would_block) | |
else | |
Zsys.error(format_exception(e)) | |
Wslay::Event.set_error(ctx, :callback_failure) | |
end | |
-1 | |
end | |
def on_msg_recv_callback(ctx, arg, user_data) | |
peer = @peers[user_data.get_string(0)] | |
if Wslay.is_ctrl_frame(arg[:opcode]) | |
case Wslay::OpCode[arg[:opcode]] | |
when :pong | |
Zsys.info("#{peer.peer_id_hex}: latency: #{peer.latency.stop}") | |
end | |
else | |
if arg[:msg_length] > 0 | |
case Wslay::OpCode[arg[:opcode]] | |
when :text_frame | |
msg = arg[:msg].read_bytes(arg[:msg_length]).force_encoding(Encoding::UTF_8) | |
when :binary_frame | |
msg = arg[:msg].read_bytes(arg[:msg_length]) | |
else | |
fail TypeError | |
end | |
if (response = @callback.call(peer.peer_id_hex, peer.latency, msg)) | |
if response.is_a?(Symbol) | |
if (rc = Wslay::Event.queue_close(ctx, response, nil, 0)) == 0 | |
peer.write | |
else | |
if rc == :nomem | |
raise NoMemoryError | |
else | |
raise WslayError, rc.to_s | |
end | |
end | |
else | |
peer.tell(response) | |
end | |
end | |
end | |
end | |
rescue => e | |
Zsys.error(format_exception(e)) | |
Wslay::Event.queue_close(ctx, :internal_server_error, nil, 0) | |
peer.write | |
end | |
def format_exception(e) | |
exception = "#{e}" | |
exception << "\n" << e.backtrace.join("\n\t") if e.backtrace | |
end | |
def create_accept_key(client_key) | |
Base64.strict_encode64(Digest::SHA1.digest("#{client_key}#{Wslay::WS_GUID}")) | |
end | |
end | |
end | |
server = Zwebsock::Server.new('tcp://*:9001') do |peer_id, latency, msg| | |
puts "#{peer_id}: recv #{msg}" | |
msg | |
end | |
server.wait |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment