Last active
December 12, 2015 02:08
-
-
Save mbohun/4696287 to your computer and use it in GitHub Desktop.
test
===
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <cstdlib> //for strtol | |
#include <iostream> | |
#include <sstream> | |
#include <string> | |
#include <boost/asio.hpp> | |
#include <boost/chrono.hpp> | |
#include <boost/thread.hpp> | |
#include <boost/thread/mutex.hpp> | |
#include "fifo.h" | |
#include <libwebsockets.h> | |
using boost::asio::ip::udp; | |
namespace { | |
const std::string MSG_HELP = | |
"USAGE: ./udp_mon <monitoring input port> [<forward host> <forward port>]\n" | |
"\n" | |
"\tEXAMPLE:\n" | |
"\t./udp_mon 2345 monitor incoming UDP stream on port 2345\n" | |
"\t./udp_mon 2345 192.168.0.86 2346 monitor incoming UDP stream on port 2345\n" | |
"\t and forward it to 192.168.0.86:2346\n"; | |
enum { | |
// the max possible size of the UDP data chunk in bytes - ridiculous | |
UDP_DATA_SIZE_MAX = 65507, | |
// cat /sys/class/net/eth0/mtu => 1500 | |
UDP_DATA_SIZE_MAX_MTU = (1500 - (16 + 20 + 8)), | |
// this is the static 'default' MAX for the datagram data/payload | |
DATA_SIZE_MAX = UDP_DATA_SIZE_MAX_MTU, | |
// arbitrary size of statically pre-allocated 'pool' of struct packet | |
STATIC_PACKET_POOL_SIZE = 100 | |
}; | |
struct packet { | |
udp::endpoint src; //TODO: this is a possible problem/BUG if we want a POD (for easy/fast use with memcpy) | |
std::size_t len; | |
unsigned long long time_stamp; //what units? | |
unsigned char data[DATA_SIZE_MAX]; | |
}; | |
void receive(udp::socket* sock_in, fifo<packet*> *fifo_in, fifo<packet*> *fifo_out) { | |
try { | |
for (;;) { | |
packet *p = fifo_in->pop(); | |
p->len = sock_in->receive_from(boost::asio::buffer(p->data, DATA_SIZE_MAX), | |
p->src); | |
// TODO: this is a little overhead - all we need is save start time before entering this loop, and then record the ns/us diff | |
const boost::chrono::nanoseconds ns = | |
boost::chrono::duration_cast<boost::chrono::nanoseconds>(boost::chrono::system_clock::now().time_since_epoch()); | |
p->time_stamp = ns.count(); | |
fifo_out->push(p); | |
} | |
} catch (const std::exception& e) { | |
std::cerr << "receive exception:" << e.what() << std::endl; | |
return; | |
} | |
} | |
// TODO: this is a bit naive, forwarding and monitoring datagram are read only operations, | |
// therefore there is no reason why they shouldn't run in parallel | |
void forward(udp::socket* sock_out, udp::endpoint* remote, fifo<packet*> *fifo_in, fifo<packet*> *fifo_out) { | |
try { | |
for (;;) { | |
packet *p = fifo_in->pop(); | |
size_t remain = p->len; | |
do { | |
const size_t sent_bytes = | |
sock_out->send_to(boost::asio::buffer(p->data, remain), | |
*remote); | |
remain = remain - sent_bytes; | |
} while (remain > 0); | |
fifo_out->push(p); | |
} | |
} catch (const std::exception& e) { | |
std::cerr << "forward exception:" << e.what() << std::endl; | |
return; | |
} | |
} | |
boost::mutex counter_mutex; | |
size_t counter_packet = 0; | |
size_t counter_packet_bytes = 0; | |
// TODO: SEPARATE the reading of datagram data FROM what to do with it | |
void monitor(fifo<packet*> *fifo_in, fifo<packet*> *fifo_out) { | |
// NOTE: the very first packet is used to setup/init a 'session' | |
packet *p = fifo_in->pop(); | |
const udp::endpoint endpoint_remote(p->src); | |
boost::mutex::scoped_lock lock(counter_mutex); | |
counter_packet = 1; | |
counter_packet_bytes = p->len; | |
lock.unlock(); | |
fifo_out->push(p); | |
for (;;) { | |
boost::this_thread::yield(); | |
p = fifo_in->pop(); | |
// read/copy datagram values | |
const udp::endpoint ep(p->src); | |
const size_t packet_bytes = p->len; | |
const bool is_ts = (0x47 == (unsigned char)p->data[0]); //NOTE: trivial but fully working example of payload 'analysis' | |
fifo_out->push(p); //release datagram (recycle) | |
boost::this_thread::yield(); | |
// verify remote endpoint - to avoid mixing datagrams stats from diff sources | |
if (endpoint_remote != ep) { | |
std::cerr << "receive DIFF endpoint:" << ep << std::endl; | |
break; //TODO: something more civilized - either post message or use some on_diff_endpoint_callback() | |
} | |
boost::mutex::scoped_lock lock(counter_mutex); | |
counter_packet++; | |
counter_packet_bytes += packet_bytes; | |
} | |
} | |
//TODO: dangerous idiotism - this works only if the list of enums remains unchanged, | |
// a proper sol. is std::map / std::pair<enum, string> | |
const char* reason2str(const enum libwebsocket_callback_reasons reason) { | |
static const char* REASONS[] = { | |
"LWS_CALLBACK_ESTABLISHED", | |
"LWS_CALLBACK_CLIENT_CONNECTION_ERROR", | |
"LWS_CALLBACK_CLIENT_ESTABLISHED", | |
"LWS_CALLBACK_CLOSED", | |
"LWS_CALLBACK_RECEIVE", | |
"LWS_CALLBACK_CLIENT_RECEIVE", | |
"LWS_CALLBACK_CLIENT_RECEIVE_PONG", | |
"LWS_CALLBACK_CLIENT_WRITEABLE", | |
"LWS_CALLBACK_SERVER_WRITEABLE", | |
"LWS_CALLBACK_HTTP", | |
"LWS_CALLBACK_BROADCAST", | |
"LWS_CALLBACK_FILTER_NETWORK_CONNECTION", | |
"LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION", | |
"LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS", | |
"LWS_CALLBACK_OPENSSL_LOAD_EXTRA_SERVER_VERIFY_CERTS", | |
"LWS_CALLBACK_OPENSSL_PERFORM_CLIENT_CERT_VERIFICATION", | |
"LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER", | |
"LWS_CALLBACK_CONFIRM_EXTENSION_OKAY", | |
"LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED", | |
/* external poll() management support */ | |
"LWS_CALLBACK_ADD_POLL_FD", | |
"LWS_CALLBACK_DEL_POLL_FD", | |
"LWS_CALLBACK_SET_MODE_POLL_FD", | |
"LWS_CALLBACK_CLEAR_MODE_POLL_FD" | |
}; | |
if (reason > LWS_CALLBACK_CLEAR_MODE_POLL_FD) { | |
return "UNKNOWN"; | |
} | |
return REASONS[reason]; | |
} | |
int callback_http(struct libwebsocket_context *context, | |
struct libwebsocket *wsi, | |
enum libwebsocket_callback_reasons reason, | |
void *user, | |
void *in, | |
size_t len) { | |
char client_name[128]; | |
char client_ip[128]; | |
fprintf(stderr, "serving HTTP URI %s %s\n", (char *)in, reason2str(reason)); | |
switch (reason) { | |
case LWS_CALLBACK_HTTP: | |
if (in && (0 == strcmp((const char*)in, "/favicon.ico"))) { | |
// if (libwebsockets_serve_http_file(wsi, "./favicon.ico", "image/x-icon")) { | |
// fprintf(stderr, "Failed to send favicon\n"); | |
// } | |
break; | |
} | |
if (libwebsockets_serve_http_file(wsi, "./test.html", "text/html")) { | |
fprintf(stderr, "Failed to send HTTP file\n"); | |
} | |
break; | |
case LWS_CALLBACK_FILTER_NETWORK_CONNECTION: | |
//return non-zero from here to kill the connection | |
libwebsockets_get_peer_addresses((int)(long)user, | |
client_name, | |
sizeof(client_name), | |
client_ip, | |
sizeof(client_ip)); | |
fprintf(stderr, "Received network connect from %s (%s)\n", client_name, client_ip); | |
break; | |
default: | |
break; | |
} | |
return 0; | |
} | |
struct per_session_data__udp_mon_test { | |
size_t counter_packet_bytes_last; | |
size_t counter_packet_last; | |
}; | |
struct ws_msg { | |
size_t len; | |
void *payload; | |
}; | |
fifo<std::string> fifo_ws; | |
int callback_udp_mon_test(struct libwebsocket_context *context, | |
struct libwebsocket *wsi, | |
enum libwebsocket_callback_reasons reason, | |
void *user, | |
void *in, | |
size_t len) { | |
int n = 0; | |
//struct per_session_data__udp_mon_test *pss = (per_session_data__udp_mon_test*)user; | |
fprintf(stderr, "callback_udp_mon_test reason: %s\n", reason2str(reason)); | |
switch (reason) { | |
case LWS_CALLBACK_ESTABLISHED: | |
fprintf(stderr, "LWS_CALLBACK_ESTABLISHED\n"); | |
break; | |
case LWS_CALLBACK_BROADCAST: | |
if (fifo_ws.size()) { | |
const std::string msg = fifo_ws.pop(); | |
n = libwebsocket_write(wsi, | |
(unsigned char *)msg.c_str(),// + LWS_SEND_BUFFER_PRE_PADDING, | |
msg.length(), | |
LWS_WRITE_TEXT); | |
if (n < 0) { | |
fprintf(stderr, "ERROR writing to socket"); | |
exit(1); | |
} | |
//libwebsocket_rx_flow_control(wsi, 1); | |
libwebsocket_callback_on_writable(context, wsi); | |
} | |
break; | |
// NOTE: ATM we do not use these | |
case LWS_CALLBACK_RECEIVE: | |
case LWS_CALLBACK_SERVER_WRITEABLE: | |
case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION: | |
break; | |
default: | |
break; | |
} | |
return 0; | |
} | |
// TODO: clean it up | |
void update_session_stats(const unsigned int update_interval) { | |
size_t counter_packet_bytes_last = 0; | |
size_t counter_packet_last = 0; | |
const float per_sec = 1000.0 / update_interval; | |
std::cout << "per_sec:" << per_sec << std::endl; | |
const boost::posix_time::milliseconds update_interval_ms(update_interval); | |
// NOTE: only tmp solution for the HTML5 WebSocket-s client will be moved (as planned) to a | |
// separate (plugin_websocket) file. | |
// | |
//first protocol must always be HTTP handler for ws handshake over HTTP | |
static struct libwebsocket_protocols protocols[] = { | |
{ | |
"http-only", //name | |
callback_http, //callback | |
0 //per_session_data_size | |
}, | |
{ | |
"udp-mon-test-protocol", | |
callback_udp_mon_test, | |
sizeof(struct per_session_data__udp_mon_test) //TODO: fix this, some reasonable | |
}, | |
{ | |
NULL, NULL, 0 | |
} | |
}; | |
struct libwebsocket_context *context; | |
unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 1024 + LWS_SEND_BUFFER_POST_PADDING]; | |
int opts = 0; | |
char interface_name[128] = ""; | |
const char *interface = 0; | |
int port = 7681; | |
context = libwebsocket_create_context(port, | |
interface, | |
protocols, | |
libwebsocket_internal_extensions, | |
NULL, //cert path for ssl | |
NULL, //key path for ssl | |
NULL, | |
-1, | |
-1, | |
opts, | |
NULL); | |
buf[LWS_SEND_BUFFER_PRE_PADDING] = 'x'; | |
int n = 0; | |
bool use_ws = true; | |
if (NULL == context) { | |
std::cerr << "libwebsocket_create_context err, ws disabled..." << std::endl; | |
use_ws = false; | |
} | |
for (;;) { | |
boost::mutex::scoped_lock lock(counter_mutex); //1. lock the counters (or use some atomic access) | |
const size_t cp = counter_packet; //2. copy the values (or swap pointers - if there is a bigger structure in future) | |
const size_t cpb = counter_packet_bytes; // so the monitor thread is not waiting for no reason | |
lock.unlock(); //3. and unlock | |
// NOTE 1: There is *NO* need to do any calculation-s inside this app AT ALL, it could simply | |
// only collect and forward 'raw data' to the client-s (other processes) who could do | |
// their own calculations. Perhaps we could later add some --raw-mode config option | |
// that will do just that. | |
// | |
// NOTE 2: now do any 'calculations'/'updates', you want, ATM we have: | |
// - delta-s of received bytes and packet counters | |
// since the previous update | |
// - bit/s per seconds | |
// | |
const size_t p = (cp - counter_packet_last); | |
counter_packet_last = cp; | |
const size_t b = (cpb - counter_packet_bytes_last); | |
counter_packet_bytes_last = cpb; | |
const size_t bps = (b * 8) * per_sec; | |
// TODO: of course the session stats update and distribution/reporting/presentation of the updated | |
// stats are two separate/independent actions - this is in here only for simplicity, etc. | |
// | |
std::stringstream ss; | |
ss << cp << "," // total number of UDP packets received | |
<< cpb << "," // total number of bytes received | |
<< p << "," // number of UDP packets received since the last update | |
<< b << "," // number of bytes received since the last update | |
<< bps; // bit/s since the last update | |
std::string msg = ss.str(); | |
std::cout << msg << std::endl; | |
if (use_ws) { | |
fifo_ws.push(msg); | |
libwebsockets_broadcast(&protocols[1], | |
&buf[LWS_SEND_BUFFER_PRE_PADDING], | |
1); | |
n = libwebsocket_service(context, 50); | |
std::cout << "n=" << n << std::endl; | |
} | |
boost::this_thread::sleep(update_interval_ms); | |
} | |
if (use_ws) { | |
libwebsocket_context_destroy(context); | |
} | |
} | |
} | |
int main(int argc, char* argv[]) { | |
// TODO: register signal handlers | |
if (argc < 2) { | |
std::cerr << MSG_HELP << std::endl; | |
return 0; | |
} | |
try { | |
const unsigned short port = std::strtol(argv[1], NULL, 10); | |
std::cout << "port:" << port << std::endl; | |
boost::asio::io_service io; | |
udp::socket sock_in(io, udp::endpoint(udp::v4(), port)); | |
fifo<packet*> fifo_datagram; | |
fifo<packet*> fifo_monitor; | |
const unsigned int update_interval = 1000; //ms | |
boost::thread thread_update(update_session_stats, | |
update_interval); | |
boost::thread thread_monitor(monitor, | |
&fifo_monitor, | |
&fifo_datagram); | |
// TODO: static pool is the default, add a config option for a dynamic one (size of packet, number of packets to prealloc) | |
packet pool[STATIC_PACKET_POOL_SIZE]; | |
for (size_t i = 0; i < STATIC_PACKET_POOL_SIZE; i++) { | |
// std::cout << i << ":" << sizeof (pool[i].data) << std::endl; | |
fifo_datagram.push(&pool[i]); | |
} | |
if (argc == 4) { | |
try { | |
udp::resolver resolver(io); | |
udp::resolver::query query(udp::v4(), argv[2], argv[3]); | |
udp::endpoint endpoint_forward(*resolver.resolve(query)); | |
std::cout << "endpoint_forward:" << endpoint_forward << std::endl; | |
udp::socket sock_out(io, udp::v4()); | |
//TODO: sock_out.connect(endpoint_forward); ... sock_out.send(); doesn't work? | |
fifo<packet*> fifo_forward; | |
boost::thread thread_forward(forward, | |
&sock_out, | |
&endpoint_forward, | |
&fifo_forward, | |
&fifo_monitor); | |
receive(&sock_in, | |
&fifo_datagram, | |
&fifo_forward); | |
} catch (const std::exception& e) { | |
std::cerr << "forward disabled:" << e.what() << std::endl; | |
} | |
} | |
receive(&sock_in, | |
&fifo_datagram, | |
&fifo_monitor); | |
} catch (const std::exception& e) { | |
std::cerr << "main:" << e.what() << std::endl; | |
} | |
std::cout << "END" << std::endl; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef FIFO_H | |
#define FIFO_H | |
#include <queue> | |
#include <boost/thread/condition.hpp> | |
#include <boost/thread/mutex.hpp> | |
template <typename T> | |
class fifo { | |
public: | |
void push(const T& t) { | |
boost::mutex::scoped_lock lock(mutex_); | |
const bool was_empty = queue_.empty(); | |
queue_.push(t); | |
lock.unlock(); | |
if (was_empty) { | |
condition_.notify_all(); | |
} | |
} | |
T pop() { | |
boost::mutex::scoped_lock lock(mutex_); | |
while (queue_.empty()) { | |
condition_.wait(lock); | |
} | |
T t = queue_.front(); | |
queue_.pop(); | |
lock.unlock(); | |
return t; | |
} | |
size_t size() { | |
boost::mutex::scoped_lock lock(mutex_); | |
return queue_.size(); | |
} | |
private: | |
std::queue<T> queue_; | |
boost::condition condition_; | |
boost::mutex mutex_; | |
}; | |
#endif //FIFO_H |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# build and install libwebsockets (tag v1.0-chrome25-firefox17) | |
# | |
$ git clone git://git.warmcat.com/libwebsockets | |
$ cd libwebsockets | |
$ git checkout v1.0-chrome25-firefox17 | |
$ ./autogen.sh | |
$ ./configure --prefix=/usr/local/libwebsockets-1.0 | |
$ make | |
$ sudo make install | |
# add the installed libwebsockets to your PKG_CONFIG_PATH | |
# | |
$ export PKG_CONFIG_PATH=/usr/local/libwebsockets-1.0/lib/pkgconfig:$PKG_CONFIG_PATH | |
# build udp_mon | |
# | |
$ g++ udp_mon.cpp -o udp_mon -pthread -I /usr/local/boost_1_51_0/include -L /usr/local/boost_1_51_0/lib -lboost_system -lboost_chrono -lboost_thread `pkg-config libwebsockets --cflags --libs` | |
# add whatever you need to LD_LIBRARY_PATH if required | |
# | |
$ export LD_LIBRARY_PATH=/usr/local/boost_1_51_0/lib:/usr/local/libwebsockets-1.0/lib:$LD_LIBRARY_PATH | |
# and run udp_mon, for example: | |
# | |
$ ./udp_mon 2345 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<html> | |
<head> | |
<title>test udp_mon</title> | |
</head> | |
<body> | |
<canvas id="udp_packets" width="400" height="100" data-test="blah"></canvas> | |
<br/> | |
input port:<input id="input_port" type ="test" size="10"></input> | |
<button type="button" onclick="onclick_handler_send()">SEND</button><button type="button" onclick="onclick_handler_stop()">STOP</button> | |
<br/> | |
<script type="text/javascript"> | |
var get_ws_url = function(url) { | |
if (url.substring(0, 5) == "https") { | |
return "wss://".concat(url.substr(8).split('/')[0]); | |
} else if (url.substring(0, 4) == "http") { | |
return "ws://".concat(url.substr(7).split('/')[0]); | |
} else { | |
return "err"; //or throw an exception uknown URL/URI type | |
} | |
}; | |
var sock; //only tmp solution, we will hide everything nicely into a closure | |
var onclick_handler_send = function () { | |
console.log("onclick_handler_send"); | |
var input_port = document.getElementById('input_port').value, | |
udp_mon_ws_url = get_ws_url(document.URL), | |
fifo = new Array(400), | |
c = document.getElementById('udp_packets'), | |
ctx, i; | |
console.log(udp_mon_ws_url); | |
console.log(input_port); | |
try { | |
// NOTE: ctor throws SECURITY_ERR if the port is blocked | |
sock = new WebSocket(udp_mon_ws_url, "udp-mon-test-protocol"); | |
sock.onopen = function(event) { | |
console.log("sock.onopen:" + event); | |
}; | |
sock.onmessage = function (event) { | |
console.log("sock.onmessage:" + event.data); | |
}; | |
sock.onerror = function (event) { | |
console.log("sock.onerror:" + event); | |
}; | |
sock.onclose = function (event) { | |
console.log("sock.onclose:" + event.reason); | |
}; | |
} catch (ex) { | |
alert('ERROR:' + ex); | |
} | |
}; | |
var onclick_handler_stop = function () { | |
console.log("onclick_handler_stop"); | |
if ((typeof(sock) !== 'undefined') && sock.close !== undefined) { | |
console.log("onclick_handler_stop calling sock.close()"); | |
sock.close(); | |
} | |
}; | |
</script> | |
</body> | |
</html> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment