Skip to content

Instantly share code, notes, and snippets.

@mbohun
Last active December 12, 2015 02:08
Show Gist options
  • Save mbohun/4696287 to your computer and use it in GitHub Desktop.
Save mbohun/4696287 to your computer and use it in GitHub Desktop.
test ===
#include <cstdlib> //for strtol
#include <iostream>
#include <sstream>
#include <string>
#include <boost/asio.hpp>
#include <boost/chrono.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include "fifo.h"
#include <libwebsockets.h>
using boost::asio::ip::udp;
namespace {
const std::string MSG_HELP =
"USAGE: ./udp_mon <monitoring input port> [<forward host> <forward port>]\n"
"\n"
"\tEXAMPLE:\n"
"\t./udp_mon 2345 monitor incoming UDP stream on port 2345\n"
"\t./udp_mon 2345 192.168.0.86 2346 monitor incoming UDP stream on port 2345\n"
"\t and forward it to 192.168.0.86:2346\n";
enum {
// the max possible size of the UDP data chunk in bytes - ridiculous
UDP_DATA_SIZE_MAX = 65507,
// cat /sys/class/net/eth0/mtu => 1500
UDP_DATA_SIZE_MAX_MTU = (1500 - (16 + 20 + 8)),
// this is the static 'default' MAX for the datagram data/payload
DATA_SIZE_MAX = UDP_DATA_SIZE_MAX_MTU,
// arbitrary size of statically pre-allocated 'pool' of struct packet
STATIC_PACKET_POOL_SIZE = 100
};
struct packet {
udp::endpoint src; //TODO: this is a possible problem/BUG if we want a POD (for easy/fast use with memcpy)
std::size_t len;
unsigned long long time_stamp; //what units?
unsigned char data[DATA_SIZE_MAX];
};
void receive(udp::socket* sock_in, fifo<packet*> *fifo_in, fifo<packet*> *fifo_out) {
try {
for (;;) {
packet *p = fifo_in->pop();
p->len = sock_in->receive_from(boost::asio::buffer(p->data, DATA_SIZE_MAX),
p->src);
// TODO: this is a little overhead - all we need is save start time before entering this loop, and then record the ns/us diff
const boost::chrono::nanoseconds ns =
boost::chrono::duration_cast<boost::chrono::nanoseconds>(boost::chrono::system_clock::now().time_since_epoch());
p->time_stamp = ns.count();
fifo_out->push(p);
}
} catch (const std::exception& e) {
std::cerr << "receive exception:" << e.what() << std::endl;
return;
}
}
// TODO: this is a bit naive, forwarding and monitoring datagram are read only operations,
// therefore there is no reason why they shouldn't run in parallel
void forward(udp::socket* sock_out, udp::endpoint* remote, fifo<packet*> *fifo_in, fifo<packet*> *fifo_out) {
try {
for (;;) {
packet *p = fifo_in->pop();
size_t remain = p->len;
do {
const size_t sent_bytes =
sock_out->send_to(boost::asio::buffer(p->data, remain),
*remote);
remain = remain - sent_bytes;
} while (remain > 0);
fifo_out->push(p);
}
} catch (const std::exception& e) {
std::cerr << "forward exception:" << e.what() << std::endl;
return;
}
}
boost::mutex counter_mutex;
size_t counter_packet = 0;
size_t counter_packet_bytes = 0;
// TODO: SEPARATE the reading of datagram data FROM what to do with it
void monitor(fifo<packet*> *fifo_in, fifo<packet*> *fifo_out) {
// NOTE: the very first packet is used to setup/init a 'session'
packet *p = fifo_in->pop();
const udp::endpoint endpoint_remote(p->src);
boost::mutex::scoped_lock lock(counter_mutex);
counter_packet = 1;
counter_packet_bytes = p->len;
lock.unlock();
fifo_out->push(p);
for (;;) {
boost::this_thread::yield();
p = fifo_in->pop();
// read/copy datagram values
const udp::endpoint ep(p->src);
const size_t packet_bytes = p->len;
const bool is_ts = (0x47 == (unsigned char)p->data[0]); //NOTE: trivial but fully working example of payload 'analysis'
fifo_out->push(p); //release datagram (recycle)
boost::this_thread::yield();
// verify remote endpoint - to avoid mixing datagrams stats from diff sources
if (endpoint_remote != ep) {
std::cerr << "receive DIFF endpoint:" << ep << std::endl;
break; //TODO: something more civilized - either post message or use some on_diff_endpoint_callback()
}
boost::mutex::scoped_lock lock(counter_mutex);
counter_packet++;
counter_packet_bytes += packet_bytes;
}
}
//TODO: dangerous idiotism - this works only if the list of enums remains unchanged,
// a proper sol. is std::map / std::pair<enum, string>
const char* reason2str(const enum libwebsocket_callback_reasons reason) {
static const char* REASONS[] = {
"LWS_CALLBACK_ESTABLISHED",
"LWS_CALLBACK_CLIENT_CONNECTION_ERROR",
"LWS_CALLBACK_CLIENT_ESTABLISHED",
"LWS_CALLBACK_CLOSED",
"LWS_CALLBACK_RECEIVE",
"LWS_CALLBACK_CLIENT_RECEIVE",
"LWS_CALLBACK_CLIENT_RECEIVE_PONG",
"LWS_CALLBACK_CLIENT_WRITEABLE",
"LWS_CALLBACK_SERVER_WRITEABLE",
"LWS_CALLBACK_HTTP",
"LWS_CALLBACK_BROADCAST",
"LWS_CALLBACK_FILTER_NETWORK_CONNECTION",
"LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION",
"LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS",
"LWS_CALLBACK_OPENSSL_LOAD_EXTRA_SERVER_VERIFY_CERTS",
"LWS_CALLBACK_OPENSSL_PERFORM_CLIENT_CERT_VERIFICATION",
"LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER",
"LWS_CALLBACK_CONFIRM_EXTENSION_OKAY",
"LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED",
/* external poll() management support */
"LWS_CALLBACK_ADD_POLL_FD",
"LWS_CALLBACK_DEL_POLL_FD",
"LWS_CALLBACK_SET_MODE_POLL_FD",
"LWS_CALLBACK_CLEAR_MODE_POLL_FD"
};
if (reason > LWS_CALLBACK_CLEAR_MODE_POLL_FD) {
return "UNKNOWN";
}
return REASONS[reason];
}
int callback_http(struct libwebsocket_context *context,
struct libwebsocket *wsi,
enum libwebsocket_callback_reasons reason,
void *user,
void *in,
size_t len) {
char client_name[128];
char client_ip[128];
fprintf(stderr, "serving HTTP URI %s %s\n", (char *)in, reason2str(reason));
switch (reason) {
case LWS_CALLBACK_HTTP:
if (in && (0 == strcmp((const char*)in, "/favicon.ico"))) {
// if (libwebsockets_serve_http_file(wsi, "./favicon.ico", "image/x-icon")) {
// fprintf(stderr, "Failed to send favicon\n");
// }
break;
}
if (libwebsockets_serve_http_file(wsi, "./test.html", "text/html")) {
fprintf(stderr, "Failed to send HTTP file\n");
}
break;
case LWS_CALLBACK_FILTER_NETWORK_CONNECTION:
//return non-zero from here to kill the connection
libwebsockets_get_peer_addresses((int)(long)user,
client_name,
sizeof(client_name),
client_ip,
sizeof(client_ip));
fprintf(stderr, "Received network connect from %s (%s)\n", client_name, client_ip);
break;
default:
break;
}
return 0;
}
struct per_session_data__udp_mon_test {
size_t counter_packet_bytes_last;
size_t counter_packet_last;
};
struct ws_msg {
size_t len;
void *payload;
};
fifo<std::string> fifo_ws;
int callback_udp_mon_test(struct libwebsocket_context *context,
struct libwebsocket *wsi,
enum libwebsocket_callback_reasons reason,
void *user,
void *in,
size_t len) {
int n = 0;
//struct per_session_data__udp_mon_test *pss = (per_session_data__udp_mon_test*)user;
fprintf(stderr, "callback_udp_mon_test reason: %s\n", reason2str(reason));
switch (reason) {
case LWS_CALLBACK_ESTABLISHED:
fprintf(stderr, "LWS_CALLBACK_ESTABLISHED\n");
break;
case LWS_CALLBACK_BROADCAST:
if (fifo_ws.size()) {
const std::string msg = fifo_ws.pop();
n = libwebsocket_write(wsi,
(unsigned char *)msg.c_str(),// + LWS_SEND_BUFFER_PRE_PADDING,
msg.length(),
LWS_WRITE_TEXT);
if (n < 0) {
fprintf(stderr, "ERROR writing to socket");
exit(1);
}
//libwebsocket_rx_flow_control(wsi, 1);
libwebsocket_callback_on_writable(context, wsi);
}
break;
// NOTE: ATM we do not use these
case LWS_CALLBACK_RECEIVE:
case LWS_CALLBACK_SERVER_WRITEABLE:
case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION:
break;
default:
break;
}
return 0;
}
// TODO: clean it up
void update_session_stats(const unsigned int update_interval) {
size_t counter_packet_bytes_last = 0;
size_t counter_packet_last = 0;
const float per_sec = 1000.0 / update_interval;
std::cout << "per_sec:" << per_sec << std::endl;
const boost::posix_time::milliseconds update_interval_ms(update_interval);
// NOTE: only tmp solution for the HTML5 WebSocket-s client will be moved (as planned) to a
// separate (plugin_websocket) file.
//
//first protocol must always be HTTP handler for ws handshake over HTTP
static struct libwebsocket_protocols protocols[] = {
{
"http-only", //name
callback_http, //callback
0 //per_session_data_size
},
{
"udp-mon-test-protocol",
callback_udp_mon_test,
sizeof(struct per_session_data__udp_mon_test) //TODO: fix this, some reasonable
},
{
NULL, NULL, 0
}
};
struct libwebsocket_context *context;
unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 1024 + LWS_SEND_BUFFER_POST_PADDING];
int opts = 0;
char interface_name[128] = "";
const char *interface = 0;
int port = 7681;
context = libwebsocket_create_context(port,
interface,
protocols,
libwebsocket_internal_extensions,
NULL, //cert path for ssl
NULL, //key path for ssl
NULL,
-1,
-1,
opts,
NULL);
buf[LWS_SEND_BUFFER_PRE_PADDING] = 'x';
int n = 0;
bool use_ws = true;
if (NULL == context) {
std::cerr << "libwebsocket_create_context err, ws disabled..." << std::endl;
use_ws = false;
}
for (;;) {
boost::mutex::scoped_lock lock(counter_mutex); //1. lock the counters (or use some atomic access)
const size_t cp = counter_packet; //2. copy the values (or swap pointers - if there is a bigger structure in future)
const size_t cpb = counter_packet_bytes; // so the monitor thread is not waiting for no reason
lock.unlock(); //3. and unlock
// NOTE 1: There is *NO* need to do any calculation-s inside this app AT ALL, it could simply
// only collect and forward 'raw data' to the client-s (other processes) who could do
// their own calculations. Perhaps we could later add some --raw-mode config option
// that will do just that.
//
// NOTE 2: now do any 'calculations'/'updates', you want, ATM we have:
// - delta-s of received bytes and packet counters
// since the previous update
// - bit/s per seconds
//
const size_t p = (cp - counter_packet_last);
counter_packet_last = cp;
const size_t b = (cpb - counter_packet_bytes_last);
counter_packet_bytes_last = cpb;
const size_t bps = (b * 8) * per_sec;
// TODO: of course the session stats update and distribution/reporting/presentation of the updated
// stats are two separate/independent actions - this is in here only for simplicity, etc.
//
std::stringstream ss;
ss << cp << "," // total number of UDP packets received
<< cpb << "," // total number of bytes received
<< p << "," // number of UDP packets received since the last update
<< b << "," // number of bytes received since the last update
<< bps; // bit/s since the last update
std::string msg = ss.str();
std::cout << msg << std::endl;
if (use_ws) {
fifo_ws.push(msg);
libwebsockets_broadcast(&protocols[1],
&buf[LWS_SEND_BUFFER_PRE_PADDING],
1);
n = libwebsocket_service(context, 50);
std::cout << "n=" << n << std::endl;
}
boost::this_thread::sleep(update_interval_ms);
}
if (use_ws) {
libwebsocket_context_destroy(context);
}
}
}
int main(int argc, char* argv[]) {
// TODO: register signal handlers
if (argc < 2) {
std::cerr << MSG_HELP << std::endl;
return 0;
}
try {
const unsigned short port = std::strtol(argv[1], NULL, 10);
std::cout << "port:" << port << std::endl;
boost::asio::io_service io;
udp::socket sock_in(io, udp::endpoint(udp::v4(), port));
fifo<packet*> fifo_datagram;
fifo<packet*> fifo_monitor;
const unsigned int update_interval = 1000; //ms
boost::thread thread_update(update_session_stats,
update_interval);
boost::thread thread_monitor(monitor,
&fifo_monitor,
&fifo_datagram);
// TODO: static pool is the default, add a config option for a dynamic one (size of packet, number of packets to prealloc)
packet pool[STATIC_PACKET_POOL_SIZE];
for (size_t i = 0; i < STATIC_PACKET_POOL_SIZE; i++) {
// std::cout << i << ":" << sizeof (pool[i].data) << std::endl;
fifo_datagram.push(&pool[i]);
}
if (argc == 4) {
try {
udp::resolver resolver(io);
udp::resolver::query query(udp::v4(), argv[2], argv[3]);
udp::endpoint endpoint_forward(*resolver.resolve(query));
std::cout << "endpoint_forward:" << endpoint_forward << std::endl;
udp::socket sock_out(io, udp::v4());
//TODO: sock_out.connect(endpoint_forward); ... sock_out.send(); doesn't work?
fifo<packet*> fifo_forward;
boost::thread thread_forward(forward,
&sock_out,
&endpoint_forward,
&fifo_forward,
&fifo_monitor);
receive(&sock_in,
&fifo_datagram,
&fifo_forward);
} catch (const std::exception& e) {
std::cerr << "forward disabled:" << e.what() << std::endl;
}
}
receive(&sock_in,
&fifo_datagram,
&fifo_monitor);
} catch (const std::exception& e) {
std::cerr << "main:" << e.what() << std::endl;
}
std::cout << "END" << std::endl;
}
#ifndef FIFO_H
#define FIFO_H
#include <queue>
#include <boost/thread/condition.hpp>
#include <boost/thread/mutex.hpp>
template <typename T>
class fifo {
public:
void push(const T& t) {
boost::mutex::scoped_lock lock(mutex_);
const bool was_empty = queue_.empty();
queue_.push(t);
lock.unlock();
if (was_empty) {
condition_.notify_all();
}
}
T pop() {
boost::mutex::scoped_lock lock(mutex_);
while (queue_.empty()) {
condition_.wait(lock);
}
T t = queue_.front();
queue_.pop();
lock.unlock();
return t;
}
size_t size() {
boost::mutex::scoped_lock lock(mutex_);
return queue_.size();
}
private:
std::queue<T> queue_;
boost::condition condition_;
boost::mutex mutex_;
};
#endif //FIFO_H
# build and install libwebsockets (tag v1.0-chrome25-firefox17)
#
$ git clone git://git.warmcat.com/libwebsockets
$ cd libwebsockets
$ git checkout v1.0-chrome25-firefox17
$ ./autogen.sh
$ ./configure --prefix=/usr/local/libwebsockets-1.0
$ make
$ sudo make install
# add the installed libwebsockets to your PKG_CONFIG_PATH
#
$ export PKG_CONFIG_PATH=/usr/local/libwebsockets-1.0/lib/pkgconfig:$PKG_CONFIG_PATH
# build udp_mon
#
$ g++ udp_mon.cpp -o udp_mon -pthread -I /usr/local/boost_1_51_0/include -L /usr/local/boost_1_51_0/lib -lboost_system -lboost_chrono -lboost_thread `pkg-config libwebsockets --cflags --libs`
# add whatever you need to LD_LIBRARY_PATH if required
#
$ export LD_LIBRARY_PATH=/usr/local/boost_1_51_0/lib:/usr/local/libwebsockets-1.0/lib:$LD_LIBRARY_PATH
# and run udp_mon, for example:
#
$ ./udp_mon 2345
<html>
<head>
<title>test udp_mon</title>
</head>
<body>
<canvas id="udp_packets" width="400" height="100" data-test="blah"></canvas>
<br/>
input port:<input id="input_port" type ="test" size="10"></input>
<button type="button" onclick="onclick_handler_send()">SEND</button><button type="button" onclick="onclick_handler_stop()">STOP</button>
<br/>
<script type="text/javascript">
var get_ws_url = function(url) {
if (url.substring(0, 5) == "https") {
return "wss://".concat(url.substr(8).split('/')[0]);
} else if (url.substring(0, 4) == "http") {
return "ws://".concat(url.substr(7).split('/')[0]);
} else {
return "err"; //or throw an exception uknown URL/URI type
}
};
var sock; //only tmp solution, we will hide everything nicely into a closure
var onclick_handler_send = function () {
console.log("onclick_handler_send");
var input_port = document.getElementById('input_port').value,
udp_mon_ws_url = get_ws_url(document.URL),
fifo = new Array(400),
c = document.getElementById('udp_packets'),
ctx, i;
console.log(udp_mon_ws_url);
console.log(input_port);
try {
// NOTE: ctor throws SECURITY_ERR if the port is blocked
sock = new WebSocket(udp_mon_ws_url, "udp-mon-test-protocol");
sock.onopen = function(event) {
console.log("sock.onopen:" + event);
};
sock.onmessage = function (event) {
console.log("sock.onmessage:" + event.data);
};
sock.onerror = function (event) {
console.log("sock.onerror:" + event);
};
sock.onclose = function (event) {
console.log("sock.onclose:" + event.reason);
};
} catch (ex) {
alert('ERROR:' + ex);
}
};
var onclick_handler_stop = function () {
console.log("onclick_handler_stop");
if ((typeof(sock) !== 'undefined') && sock.close !== undefined) {
console.log("onclick_handler_stop calling sock.close()");
sock.close();
}
};
</script>
</body>
</html>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment