Created
January 10, 2014 06:59
-
-
Save moteus/8347993 to your computer and use it in GitHub Desktop.
Binary zeromq message class.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef __BZMSG_H_INCLUDED__ | |
#define __BZMSG_H_INCLUDED__ | |
#include <zmq.hpp> | |
#include <vector> | |
#include <list> | |
#include <iostream> | |
#include <iomanip> | |
#include <string> | |
#include <boost/tr1/memory.hpp> | |
using std::tr1::shared_ptr; | |
#include <boost/static_assert.hpp> | |
#define STATIC_ASSERT BOOST_STATIC_ASSERT | |
#include <boost/cstdint.hpp> | |
using boost::uint8_t; | |
using boost::uint16_t; | |
using boost::uint32_t; | |
using boost::uint64_t; | |
using boost::int8_t; | |
using boost::int16_t; | |
using boost::int32_t; | |
using boost::int64_t; | |
class bzmsg{ | |
public: | |
typedef shared_ptr<bzmsg> pointer; | |
class row_t{ | |
public: | |
typedef shared_ptr<row_t> pointer; | |
public: | |
row_t(){ | |
} | |
row_t(const row_t&rhs){ | |
this->row_ = rhs.row_; | |
} | |
explicit row_t(const char*str){ | |
this->set(str); | |
} | |
explicit row_t(const std::string &str){ | |
this->set(str); | |
} | |
explicit row_t(const byte* ptr, size_t len){ | |
this->set(ptr,len); | |
} | |
explicit row_t(const zmq::message_t &msg){ | |
this->set(msg); | |
} | |
row_t& operator= (const row_t&rhs){ | |
this->row_ = rhs.row_; | |
return *this; | |
} | |
void set(const char*str){ | |
size_t len = strlen(str); | |
row_.resize(len); | |
std::copy(&str[0],&str[len], row_.begin()); | |
} | |
void set(const std::string &str){ | |
row_.resize(str.size()); | |
std::copy(str.begin(), str.end(), row_.begin()); | |
} | |
void set(const byte* ptr, size_t len){ | |
row_.resize(len); | |
std::copy(&ptr[0],&ptr[len], row_.begin()); | |
} | |
void set(const zmq::message_t &msg){ | |
size_t len = msg.size(); | |
const byte* ptr = static_cast<const byte*>(msg.data()); | |
row_.resize(len); | |
std::copy(&ptr[0], &ptr[len], row_.begin()); | |
} | |
size_t size() const{ | |
return row_.size(); | |
} | |
byte* data(){ | |
return size() ? &row_[0] : 0; | |
} | |
const byte* data() const{ | |
return size() ? &row_[0] : 0; | |
} | |
bool is_str() const{ | |
return (size() > 0) && ('\0' == data()[size()-1]); | |
} | |
bool is_uuid() const{ | |
return (size() == 17) && (data()[0] == 0); | |
} | |
bool is_str_uuid() const{ | |
return (size() == 34) && (data()[0] == '@') && is_str(); | |
} | |
const char* c_str() const{ | |
assert(is_str()); | |
return reinterpret_cast<const char*>(this->data()); | |
} | |
char* c_str(){ | |
assert(is_str()); | |
return reinterpret_cast<char*>(this->data()); | |
} | |
char* to_str(){ | |
if(!is_str()){ | |
row_.resize(size()+1); | |
row_[size()-1] = '\0'; | |
} | |
assert(is_str()); | |
return this->c_str(); | |
} | |
void clear(){ | |
row_.clear(); | |
} | |
bool is_equal(const char *str)const{ | |
size_t len = strlen(str); | |
// we ignore last zero if row is string | |
size_t data_len = is_str() ? size()-1 : size(); | |
if(len != data_len) return false; | |
return 0 == data_cmp(str,len); | |
} | |
bool is_equal(const byte *ptr, size_t bytes)const{ | |
if(bytes != size()) | |
return false; | |
return 0 == data_cmp(ptr, bytes); | |
} | |
bool is_equal(const row_t &rhs)const{ | |
return is_equal(rhs.data(),rhs.size()); | |
} | |
bool is_startwith(const char *str)const{ | |
size_t len = strlen(str); | |
if(size() < len) | |
return false; | |
return 0 == data_cmp(str, len); | |
} | |
bool is_less(const byte *ptr, size_t bytes)const{ | |
const size_t sz = size(); | |
const int r = data_cmp(ptr, min(sz,bytes)); | |
if(r == 0) return sz < bytes; | |
return r < 0; | |
} | |
bool is_less(const row_t &rhs)const{ | |
return is_less(rhs.data(),rhs.size()); | |
} | |
private: | |
template <typename BYTE_TYPE> | |
int data_cmp(const BYTE_TYPE* ptr, size_t bytes)const{ | |
STATIC_ASSERT(sizeof(BYTE_TYPE) == 1); | |
assert(bytes <= size()); | |
return memcmp(data(), (byte*)ptr, bytes); | |
} | |
private: | |
std::vector<byte> row_; | |
public: | |
struct is_less_fn{ | |
bool operator()(const row_t::pointer lhs, const row_t::pointer rhs)const{return lhs->is_less(*rhs);} | |
}; | |
}; | |
typedef row_t::pointer row_ptr; | |
typedef std::list<row_ptr> rows_t; | |
public: | |
size_t parts()const{ | |
return rows_.size(); | |
} | |
void clear(){ | |
rows_.clear(); | |
} | |
void push_front(){ | |
rows_.push_front(row_ptr(new row_t())); | |
} | |
void push_front(row_ptr part){ | |
rows_.push_front(part); | |
} | |
void push_front(const char *part){ | |
rows_.push_front(row_ptr(new row_t(part))); | |
} | |
void push_front(const std::string &part){ | |
rows_.push_front(row_ptr(new row_t(part))); | |
} | |
void push_front(const byte *ptr, size_t len){ | |
rows_.push_front(row_ptr(new row_t(ptr, len))); | |
} | |
void push_front(const char *ptr, size_t len){ | |
rows_.push_front(row_ptr(new row_t((byte*)ptr, len))); | |
} | |
void push_front(const zmq::message_t &msg){ | |
rows_.push_front(row_ptr(new row_t(msg))); | |
} | |
void push_back(){ | |
rows_.push_back(row_ptr(new row_t())); | |
} | |
void push_back(row_ptr part){ | |
rows_.push_back(part); | |
} | |
void push_back(const char *part){ | |
rows_.push_back(row_ptr(new row_t(part))); | |
} | |
void push_back(const std::string &part){ | |
rows_.push_back(row_ptr(new row_t(part))); | |
} | |
void push_back(const byte*ptr, size_t len){ | |
rows_.push_back(row_ptr(new row_t(ptr, len))); | |
} | |
void push_back(const char*ptr, size_t len){ | |
rows_.push_back(row_ptr(new row_t((byte*)ptr, len))); | |
} | |
void push_back(const zmq::message_t &msg){ | |
rows_.push_back(row_ptr(new row_t(msg))); | |
} | |
row_ptr pop_front(){ | |
if(this->parts() == 0){ | |
throw std::out_of_range("message is empty"); | |
} | |
row_ptr part = top(); | |
rows_.pop_front(); | |
return part; | |
} | |
row_ptr pop_back(){ | |
if(this->parts() == 0){ | |
throw std::out_of_range("message is empty"); | |
} | |
row_ptr part = back(); | |
rows_.pop_back(); | |
return part; | |
} | |
row_ptr top(){ | |
if(this->parts() == 0){ | |
throw std::out_of_range("message is empty"); | |
} | |
return rows_.front(); | |
} | |
const row_ptr top()const{ | |
if(this->parts() == 0){ | |
throw std::out_of_range("message is empty"); | |
} | |
return rows_.front(); | |
} | |
row_ptr back(){ | |
if(this->parts() == 0){ | |
throw std::out_of_range("message is empty"); | |
} | |
return rows_.back(); | |
} | |
const row_ptr back()const{ | |
if(this->parts() == 0){ | |
throw std::out_of_range("message is empty"); | |
} | |
return rows_.back(); | |
} | |
rows_t::const_iterator begin()const{ | |
return rows_.begin(); | |
} | |
rows_t::const_reverse_iterator rbegin()const{ | |
return rows_.rbegin(); | |
} | |
rows_t::const_iterator end()const{ | |
return rows_.end(); | |
} | |
rows_t::const_reverse_iterator rend()const{ | |
return rows_.rend(); | |
} | |
template<typename A, typename D> | |
void wrap(A addr, D delim){ | |
if(delim){ | |
push_front(delim); | |
} | |
push_front(addr); | |
} | |
void wrap(bzmsg::pointer addr, const char *delim){ | |
if(delim){ | |
push_front(delim); | |
} | |
while(addr->parts()){ | |
row_ptr part = addr->pop_back(); | |
push_front(part); | |
} | |
} | |
bzmsg::pointer unwrap(){ | |
bzmsg::pointer addr = bzmsg::pointer(new bzmsg); | |
while(parts()){ | |
row_ptr part = pop_front(); | |
if(part->size() == 0) | |
break; | |
addr->push_back(part); | |
} | |
return addr; | |
} | |
row_ptr address(){ | |
if(parts() == 0) | |
return row_ptr(); | |
return top(); | |
} | |
const row_ptr address() const{ | |
if(parts() == 0) | |
return row_ptr(); | |
return top(); | |
} | |
row_ptr body(){ | |
if(parts() == 0) | |
return row_ptr(); | |
return rows_.back(); | |
} | |
const row_ptr body() const{ | |
if(parts() == 0) | |
return row_ptr(); | |
return rows_.back(); | |
} | |
bool recv(zmq::socket_t & socket){ | |
rows_t::iterator it = rows_.begin(); | |
rows_t::iterator en = rows_.end(); | |
bool is_end = (it == en); | |
bool res = true; | |
while(1){ | |
zmq::message_t message(0); | |
while(!socket.recv(&message, 0));//EAGAIN | |
if(is_end){ | |
push_back(message); | |
} | |
else{ | |
(*it)->set(message); | |
++it; | |
is_end = (it == en); | |
} | |
int64_t more; | |
size_t more_size = sizeof(more); | |
socket.getsockopt(ZMQ_RCVMORE, &more, &more_size); | |
if(!more){ | |
break; | |
} | |
} | |
if(!is_end) | |
rows_.erase(it, en); | |
return res; | |
} | |
bool send(zmq::socket_t & socket) const{ | |
for(rows_t::const_iterator it=rows_.begin(), en = rows_.end(); it!=en;){ | |
zmq::message_t message((*it)->size()); | |
memcpy(message.data(), (*it)->data(), (*it)->size()); | |
++it; | |
while(!socket.send(message, (it == en) ? 0 : ZMQ_SNDMORE));//EAGAIN | |
} | |
return true; | |
} | |
void dump(std::ostream&ost = std::cerr)const; | |
private: | |
rows_t rows_; | |
}; | |
inline | |
std::ostream& operator << (std::ostream&ost,const bzmsg::row_t& r){ | |
const byte *data = r.data(); | |
size_t size = r.size(); | |
// Dump the message as text or binary | |
int is_text = 1; | |
for (unsigned int char_nbr = 0; char_nbr < size; char_nbr++) | |
if (data [char_nbr] < 32 || data [char_nbr] > 127) | |
is_text = 0; | |
for (unsigned int char_nbr = 0; char_nbr < size; char_nbr++) { | |
if (is_text) { | |
ost << (char) data [char_nbr]; | |
} else { | |
ost << std::hex << std::setw(2) << std::setfill('0') << (short int) data [char_nbr]; | |
} | |
} | |
return ost; | |
} | |
inline | |
std::ostream& operator << (std::ostream&ost, const bzmsg& msg){ | |
msg.dump(ost); | |
return ost; | |
} | |
inline | |
void bzmsg::dump(std::ostream&ost)const{ | |
ost << "--------------------------------------" << std::endl; | |
for(rows_t::const_iterator it=rows_.begin(), en = rows_.end(); it!=en; ++it){ | |
size_t size = (*it)->size(); | |
ost << "[" << std::setw(3) << std::setfill('0') << (int) size << "] " | |
<< **it << std::endl; | |
} | |
} | |
typedef bzmsg::pointer bzmsg_ptr; | |
#endif /* BZMSG_H_ */ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment