Skip to content

Instantly share code, notes, and snippets.

@jamesu
Created February 5, 2014 01:39
Show Gist options
  • Save jamesu/8815991 to your computer and use it in GitHub Desktop.
Save jamesu/8815991 to your computer and use it in GitHub Desktop.
An experiment in implementing the mongodb server protocol in poco
// Copyright (C) 2013 James S Urquhart.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to
// deal in the Software without restriction, including without limitation the
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
// sell copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
// IN THE SOFTWARE.
//-----------------------------------------------------------------------------
// NOTE: compile with "g++ main.cpp -lPocoNet -lPocoUtil -lPocoFoundation -lmongoc -o test -DMONGO_HAVE_STDINT"
#include "Poco/Net/SocketReactor.h"
#include "Poco/Net/SocketAcceptor.h"
#include "Poco/Net/SocketNotification.h"
#include "Poco/Net/StreamSocket.h"
#include "Poco/Net/ServerSocket.h"
#include "Poco/Net/NetException.h"
#include "Poco/NObserver.h"
#include "Poco/Exception.h"
#include "Poco/Buffer.h"
#include "Poco/ByteOrder.h"
#include "Poco/Thread.h"
#include "Poco/MemoryStream.h"
#include "Poco/Util/ServerApplication.h"
#include "Poco/Util/Option.h"
#include "Poco/Util/OptionSet.h"
#include "Poco/Util/HelpFormatter.h"
#include <iostream>
#include <vector>
#include "mongo.h"
using Poco::Net::SocketReactor;
using Poco::Net::SocketAcceptor;
using Poco::Net::ReadableNotification;
using Poco::Net::ErrorNotification;
using Poco::Net::TimeoutNotification;
using Poco::Net::IdleNotification;
using Poco::Net::ShutdownNotification;
using Poco::Net::ServerSocket;
using Poco::Net::StreamSocket;
using Poco::NObserver;
using Poco::AutoPtr;
using Poco::Thread;
using Poco::Util::ServerApplication;
using Poco::Util::Application;
using Poco::Util::Option;
using Poco::Util::OptionSet;
using Poco::Util::HelpFormatter;
using Poco::Buffer;
using Poco::ByteOrder;
using Poco::MemoryInputStream;
using Poco::MemoryOutputStream;
using Poco::MemoryIOS;
#define MONGO_OP_REPLY 1
#define MONGO_OP_OP_MSG 1000
#define MONGO_OP_UPDATE 2001
#define MONGO_OP_INSERT 2002
#define MONGO_OP_RESERVED 2003
#define MONGO_OP_QUERY 2004
#define MONGO_OP_GETMORE 2005
#define MONGO_OP_DELETE 2006
#define MONGO_OP_KILL_CURSORS 2007
typedef int int32;
typedef struct MongoMsgHeader {
int32 messageLength; // total message size, including this
int32 requestID; // identifier for this message
int32 responseTo; // requestID from the original request
// (used in reponses from db)
int32 opCode; // request type - see table below
} MongoMsgHeader;
// Need our own Memory stream since POCO's is terrible
class Stream
{
public:
Stream() {;}
virtual ~Stream() {;}
virtual bool _write(int nBytes, const void *data) = 0;
virtual bool _read(int nBytes, void *data) = 0;
virtual int getPosition() { return 0; }
virtual void setPosition(int pos) { ; }
virtual bool isEOF() { return true; }
virtual int getStreamSize() { return 0; }
bool readString(std::string &str)
{
char buffer[1024];
char *ptr = buffer;
while (_read(1, ptr)) {
if (*ptr++ == '\0')
break;
}
str = buffer;
}
bool writeString(std::string &str)
{
int len = str.length();
return _write(len+1, str.c_str());
}
template<class T> bool read(T &var) { T rvar; bool ret = _read(sizeof(T), &rvar); var = ByteOrder::toLittleEndian(rvar); return ret; };
template<class T> bool write(T var) { T wvar = ByteOrder::fromLittleEndian(var); return _write(sizeof(T), &wvar); };
};
class MemoryStream : public Stream
{
public:
int _curSize;
int _curPos;
char *_data;
MemoryStream(char *buffer, int size) : _curSize(size), _curPos(0)
{
_data = buffer;
}
~MemoryStream()
{
//delete[] _data;
}
bool _write(int nBytes, const void *data)
{
int bytesLeft = _curSize - _curPos;
int bytesToWrite = bytesLeft > nBytes ? nBytes : bytesLeft;
if (bytesToWrite > 0) {
memcpy(_data + _curPos, data, bytesToWrite);
_curPos += bytesToWrite;
}
return bytesToWrite == nBytes;
}
bool _read(int nBytes, void *data)
{
int bytesLeft = _curSize - _curPos;
int bytesToRead = bytesLeft > nBytes ? nBytes : bytesLeft;
if (bytesToRead > 0) {
memcpy(data, _data + _curPos, bytesToRead);
_curPos += bytesToRead;
}
return bytesToRead == nBytes;
}
virtual int getPosition() { return _curPos; }
virtual void setPosition(int pos) { if (pos >= 0 && pos <= _curSize) _curPos = pos; }
virtual bool isEOF() { return _curPos >= _curSize; }
virtual int getStreamSize() { return _curSize; }
};
class BSONBuilder
{
public:
bson _data;
BSONBuilder() { bson_init(&_data); }
~BSONBuilder() { bson_destroy(&_data); }
bson *data() { return &_data; }
void addField(const char *name, bson_oid_t *value)
{
bson_append_oid(&_data, name, value);
}
void addField(const char *name, int value)
{
bson_append_int(&_data, name, value);
}
void addField(const char *name, double value)
{
bson_append_double(&_data, name, value);
}
void addField(const char *name, int64_t value)
{
bson_append_long(&_data, name, value);
}
void addField(const char *name, const char *value)
{
bson_append_string(&_data, name, value);
}
void addField(const char *name, char type, char *data, int len)
{
bson_append_binary(&_data, name, type, data, len);
}
void addField(const char *name, bool value)
{
bson_append_bool(&_data, name, value);
}
void addNullField(const char *name)
{
bson_append_null(&_data, name);
}
void addUndefinedField(const char *name)
{
bson_append_null(&_data, name);
}
void addCodeField(const char *name, const char *value, const bson *scope)
{
if (scope)
bson_append_code_w_scope(&_data, name, value, scope);
else
bson_append_code(&_data, name, value);
}
void finish()
{
bson_finish(&_data);
}
};
class MongoDocument
{
public:
bson _data;
char *_bytes;
MongoDocument() : _bytes(0)
{
}
~MongoDocument()
{
if (_bytes) bson_destroy(&_data);
}
void setDocument(bson *data)
{
bson_init(&_data);
bson_copy(&_data, data);
_bytes = (char*)bson_data(&_data);
}
bool read(Stream &in)
{
int size = 0;
in.read(size);
printf("Document size == %i\n", size);
_bytes = new char[size+4];
in.setPosition(in.getPosition()-4);
in._read(size, _bytes);
bson_init_data(&_data, _bytes);
return true;
}
bool write(Stream &out)
{
bson_finish(&_data);
_bytes = (char*)bson_data(&_data);
out._write(bson_size(&_data), _bytes);
return true;
}
};
class MongoRequest
{
public:
MongoMsgHeader _header;
MongoRequest() {_header.opCode = 0; _header.responseTo = 0;}
virtual ~MongoRequest() {;}
virtual bool read(Stream &in) = 0;
virtual bool write(Stream &out) = 0;
};
#define MONGO_QUERY_RESERVED 0x1
#define MONGO_QUERY_TAILABLECURSOR 0x2
#define MONGO_QUERY_SLAVEOK 0x4
#define MONGO_QUERY_OPLOGREPLAY 0x8
#define MONGO_QUERY_NOCURSORTIMEOUT 0x10
#define MONGO_QUERY_AWAITDATA 0x20
#define MONGO_QUERY_EXHAUST 0x40
#define MONGO_QUERY_PARTIAL 0x80
class MongoQuery : public MongoRequest
{
public:
int _flags;
std::string _fullCollectionName;
int _numberToSkip;
int _numberToReturn;
MongoDocument *_query;
MongoDocument *_returnFieldSelector;
MongoQuery() : _flags(0), _numberToSkip(0), _numberToReturn(0), _query(0), _returnFieldSelector(0)
{
_header.opCode = MONGO_OP_QUERY;
}
~MongoQuery()
{
if (_query) delete _query;
if (_returnFieldSelector) delete _returnFieldSelector;
}
virtual bool read(Stream &in)
{
printf("READ START %i, size == %i\n", in.getPosition(), in.getStreamSize());
in.read(_flags);
in.readString(_fullCollectionName);
in.read(_numberToSkip);
in.read(_numberToReturn);
printf("READ READ %i '%s' %i %i @[%i]\n", _flags, _fullCollectionName.c_str(), _numberToSkip, _numberToReturn, in.getPosition());
// Actual query
printf("Reading _query...\n");
_query = new MongoDocument();
if (!_query->read(in)) {
delete _query;
_query = NULL;
return false;
}
// Optional: returnFieldSelector
printf("Position is now %i\n", in.getPosition());
if (!in.isEOF()) {
printf("Read returnFieldSelector\n");
_returnFieldSelector = new MongoDocument();
if (!_returnFieldSelector->read(in)) {
delete _returnFieldSelector;
delete _query;
_returnFieldSelector = _query = NULL;
return false;
}
} else {
_returnFieldSelector = NULL;
}
return true;
}
virtual bool write(Stream &out)
{
out.write(_flags);
out.writeString(_fullCollectionName);
out.write(_numberToSkip);
out.write(_numberToReturn);
// Actual query
if (_query == NULL || !_query->write(out)) {
return false;
}
// Optional: returnFieldSelector
if (_returnFieldSelector && !_returnFieldSelector->write(out)) {
return false;
}
return true;
}
};
class MongoInsert : public MongoRequest
{
public:
int _flags;
std::string _fullCollectionName;
std::vector<MongoDocument*> _documents;
MongoInsert() : _flags(0)
{
_header.opCode = MONGO_OP_INSERT;
}
~MongoInsert()
{
for (int i=0; i<_documents.size(); i++) {
delete _documents[i];
}
}
virtual bool read(Stream &in)
{
in.read(_flags);
in.readString(_fullCollectionName);
// Read documents until there are none left
while (!in.isEOF()) {
MongoDocument *document = new MongoDocument();
if (!document->read(in)) {
delete document;
break;
}
_documents.push_back(document);
}
// No documents read? Fail!
if (_documents.size() == 0)
return false;
return true;
}
virtual bool write(Stream &out)
{
out.write(_flags);
out.writeString(_fullCollectionName);
// Write all the documents
for (int i=0; i<_documents.size(); i++) {
_documents[i]->write(out);
}
return true;
}
};
#define MONGO_REPLY_CURSORNOTFOUND 0x1
#define MONGO_REPLY_QUERYFAILURE 0x2
#define MONGO_REPLY_SHARDCONFIGSTATE 0x4
#define MONGO_REPLY_AWAITCAPABLE 0x8
class MongoReply : public MongoRequest
{
public:
int _flags;
signed long _cursorID;
int _startingFrom;
std::vector<MongoDocument*> _documents;
MongoReply() : _flags(0), _cursorID(0), _startingFrom(0)
{
_header.opCode = MONGO_OP_REPLY;
}
~MongoReply()
{
for (int i=0; i<_documents.size(); i++) {
delete _documents[i];
}
}
void appendDocument(bson *data)
{
MongoDocument *document = new MongoDocument();
document->setDocument(data);
_documents.push_back(document);
}
virtual bool read(Stream &in)
{
int numDocuments;
in.read(_flags);
in.read(_cursorID);
in.read(_startingFrom);
in.read(numDocuments);
// Read numDocuments documents
_documents.reserve(numDocuments);
for (int i=0; i<numDocuments; i++) {
MongoDocument *document = new MongoDocument();
if (!document->read(in)) {
delete document;
continue;
}
_documents.push_back(document);
}
// Incorrect number of documents read? Fail!
if (numDocuments != _documents.size())
return false;
return true;
}
virtual bool write(Stream &out)
{
int numDocuments = _documents.size();
out.write(_flags);
out.write(_cursorID);
out.write(_startingFrom);
out.write(numDocuments);
// Write all the documents
for (int i=0; i<_documents.size(); i++) {
_documents[i]->write(out);
}
return true;
}
};
class MongoUpdate : public MongoRequest
{
public:
int _flags;
std::string _fullCollectionName;
MongoDocument *_selector;
MongoDocument *_update;
MongoUpdate() : _flags(0), _selector(0), _update(0)
{
_header.opCode = MONGO_OP_UPDATE;
}
~MongoUpdate()
{
if (_selector) delete _selector;
if (_update) delete _update;
}
virtual bool read(Stream &in)
{
int zero=0;
in.read(zero);
in.readString(_fullCollectionName);
in.read(_flags);
// Actual query
_selector = new MongoDocument();
if (!_selector->read(in)) {
delete _selector;
_selector = NULL;
return false;
}
// Update document
_update = new MongoDocument();
if (!_update->read(in)) {
delete _selector;
delete _update;
_selector = _update = NULL;
return false;
}
return true;
}
virtual bool write(Stream &out)
{
int zero=0;
if (_selector == NULL || _update == NULL)
return false;
out.write(zero);
out.writeString(_fullCollectionName);
out.write(_flags);
// Actual query
if (!_selector->write(out)) {
return false;
}
// Update document
if (!_update->write(out)) {
return false;
}
return true;
}
};
class MongoMore : public MongoRequest
{
public:
int _flags;
std::string _fullCollectionName;
int _numberToReturn;
signed long _cursorID;
MongoMore() : _flags(0), _numberToReturn(0), _cursorID(0)
{
_header.opCode = MONGO_OP_GETMORE;
}
~MongoMore()
{
}
virtual bool read(Stream &in)
{
int zero=0;
in.read(zero);
in.readString(_fullCollectionName);
in.read(_numberToReturn);
in.read(_cursorID);
return true;
}
virtual bool write(Stream &out)
{
int zero=0;
out.write(zero);
out.writeString(_fullCollectionName);
out.write(_numberToReturn);
out.write(_cursorID);
return true;
}
};
class MongoDelete : public MongoRequest
{
public:
int _flags;
std::string _fullCollectionName;
MongoDocument *_selector;
MongoDelete() : _flags(0), _selector(0)
{
_header.opCode = MONGO_OP_DELETE;
}
~MongoDelete()
{
if (_selector) delete _selector;
}
virtual bool read(Stream &in)
{
int zero=0;
in.read(zero);
in.readString(_fullCollectionName);
in.read(_flags);
// Actual query
_selector = new MongoDocument();
if (!_selector->read(in)) {
delete _selector;
_selector = NULL;
return false;
}
return true;
}
virtual bool write(Stream &out)
{
int zero=0;
if (_selector == NULL)
return false;
out.write(zero);
out.writeString(_fullCollectionName);
out.write(_flags);
// Actual query
if (!_selector->write(out)) {
return false;
}
return true;
}
};
class MongoKillCursors : public MongoRequest
{
public:
std::vector<signed long> _cursors;
MongoKillCursors()
{
_header.opCode = MONGO_OP_KILL_CURSORS;
}
~MongoKillCursors()
{
}
virtual bool read(Stream &in)
{
int zero=0;
int numCursors=0;
in.read(numCursors);
_cursors.reserve(numCursors);
for (int i=0; i<_cursors.size(); i++)
in.read(_cursors[i]);
return true;
}
virtual bool write(Stream &out)
{
int zero=0;
int numCursors=_cursors.size();
for (int i=0; i<numCursors; i++)
out.write(_cursors[i]);
return true;
}
};
class MongoServiceHandler
{
public:
MongoServiceHandler(StreamSocket& socket, SocketReactor& reactor):
_socket(socket),
_reactor(reactor),
_pBuffer(BUFFER_SIZE),
_readingMessage(false),
_messageSize(0),
_bufferPos(0),
_currentRequestId(0)
{
Application& app = Application::instance();
app.logger().information("Connection from " + socket.peerAddress().toString());
_reactor.addEventHandler(_socket, NObserver<MongoServiceHandler, ReadableNotification>(*this, &MongoServiceHandler::onReadable));
_reactor.addEventHandler(_socket, NObserver<MongoServiceHandler, ErrorNotification>(*this, &MongoServiceHandler::onError));
_reactor.addEventHandler(_socket, NObserver<MongoServiceHandler, TimeoutNotification>(*this, &MongoServiceHandler::onTimeout));
_reactor.addEventHandler(_socket, NObserver<MongoServiceHandler, IdleNotification>(*this, &MongoServiceHandler::onIdle));
_reactor.addEventHandler(_socket, NObserver<MongoServiceHandler, ShutdownNotification>(*this, &MongoServiceHandler::onShutdown));
}
~MongoServiceHandler()
{
Application& app = Application::instance();
try
{
app.logger().information("Disconnecting " + _socket.peerAddress().toString());
}
catch (...)
{
}
_reactor.removeEventHandler(_socket, NObserver<MongoServiceHandler, ReadableNotification>(*this, &MongoServiceHandler::onReadable));
_reactor.removeEventHandler(_socket, NObserver<MongoServiceHandler, ErrorNotification>(*this, &MongoServiceHandler::onError));
_reactor.removeEventHandler(_socket, NObserver<MongoServiceHandler, TimeoutNotification>(*this, &MongoServiceHandler::onTimeout));
_reactor.removeEventHandler(_socket, NObserver<MongoServiceHandler, IdleNotification>(*this, &MongoServiceHandler::onIdle));
_reactor.removeEventHandler(_socket, NObserver<MongoServiceHandler, ShutdownNotification>(*this, &MongoServiceHandler::onShutdown));
//delete _pBuffer;
}
int bufferUpBytes()
{
char *bufferPTR = _pBuffer.begin() + _bufferPos;
int bytesLeft = BUFFER_SIZE-_bufferPos;
if (bytesLeft <= 0) {
// Expand buffer?
return -1;
}
int n = _socket.receiveBytes(bufferPTR, bytesLeft);
printf("receiveBytes(%i) == %i\n", bytesLeft, n);
_bufferPos += n;
return n;
}
void onReadable(const AutoPtr<ReadableNotification>& pNf)
{
printf("Reading data...\n");
try {
int n = bufferUpBytes();
printf("Grabbed %i bytes...\n", n);
if (n == 0) {
// Disconnected?
delete this;
return;
}
// Keep reading data until exhausted
while (n != 0) {
if (n == -1) {
printf("Buffer exhausted\n");
// TODO: handle
return;
}
printf("Reading MORE data...\n");
if (!_readingMessage) {
// Wait till we get the message header
if (_bufferPos + n >= sizeof(MongoMsgHeader)) {
printf("Parsed header\n");
// Parse header
memcpy(&_currentMessage, _pBuffer.begin(), sizeof(MongoMsgHeader));
// Convert endian
_currentMessage.messageLength = ByteOrder::fromLittleEndian(_currentMessage.messageLength);
_currentMessage.requestID = ByteOrder::fromLittleEndian(_currentMessage.requestID);
_currentMessage.responseTo = ByteOrder::fromLittleEndian(_currentMessage.responseTo);
_currentMessage.opCode = ByteOrder::fromLittleEndian(_currentMessage.opCode);
_readingMessage = true;
if (_currentMessage.messageLength > BUFFER_SIZE) {
printf("Client sent a big message, aborting: %i\n", _currentMessage.messageLength);
_socket.shutdownSend();
return;
}
}
}
if (_readingMessage) {
// Wait until we have read a full message
printf("_pos == %i, message size == %i, header size == %i\n", _bufferPos, _currentMessage.messageLength, sizeof(MongoMsgHeader));
if (_bufferPos >= _currentMessage.messageLength) {
MemoryStream stream(_pBuffer.begin() + sizeof(MongoMsgHeader), _bufferPos - sizeof(MongoMsgHeader));
onMongoMessage(_currentMessage, stream);
// Reset, move extra data to start of buffer
_readingMessage = false;
memcpy(_pBuffer.begin(), _pBuffer.begin() + _bufferPos, _bufferPos - _currentMessage.messageLength);
n = _bufferPos = _bufferPos - _currentMessage.messageLength;
continue;
}
}
n = bufferUpBytes();
}
} catch(Poco::Net::ConnectionResetException &ex)
{
printf("Errm WTF\n");
_socket.shutdownSend();
}
}
void sendResponse(MongoRequest *response)
{
char sendBuffer[4096];
MemoryStream stream(sendBuffer, 4096);
MongoMsgHeader header = response->_header;
response->write(stream);
int size = stream.getPosition();
header.messageLength = ByteOrder::toLittleEndian(sizeof(MongoMsgHeader) + size);
header.requestID = ByteOrder::toLittleEndian(_currentRequestId++);
header.responseTo = ByteOrder::toLittleEndian(header.responseTo);
header.opCode = ByteOrder::toLittleEndian(header.opCode);
printf("Writing response header\n");
_socket.sendBytes(&header, sizeof(MongoMsgHeader));
printf("Writing response of %i bytes\n", size);
_socket.sendBytes(sendBuffer, size);
}
void onMongoMessage(MongoMsgHeader &header, Stream &in)
{
MongoRequest *request = NULL;
printf("Mongo message: %i\n", header.opCode);
switch (header.opCode) {
case MONGO_OP_OP_MSG:
break;
case MONGO_OP_UPDATE:
request = new MongoUpdate();
request->_header = header;
if (request->read(in)) {
// ...
printf("TODO: update\n");
delete request;
}
break;
case MONGO_OP_INSERT:
request = new MongoInsert();
request->_header = header;
if (request->read(in)) {
// ...
printf("TODO: insert\n");
delete request;
}
break;
case MONGO_OP_QUERY:
printf("Construcing MongoQuery\n");
request = new MongoQuery();
request->_header = header;
if (request->read(in)) {
// ...
printf("TODO: Query...\n");
MongoQuery *query = static_cast<MongoQuery*>(request);
printf("Query: %x. Doc: %x, Data: %x\n", query, query->_query, query->_query->_data);
bson_print(&query->_query->_data);
if (query->_returnFieldSelector) bson_print(&query->_returnFieldSelector->_data);
delete request;
// Create response
BSONBuilder doc;
doc.addField("$err", "Not implemented");
doc.finish();
MongoReply *reply = new MongoReply();
reply->_flags = MONGO_REPLY_QUERYFAILURE;
reply->appendDocument(doc.data());
sendResponse(reply);
delete reply;
printf("Next...\n");
}
break;
case MONGO_OP_GETMORE:
request = new MongoMore();
request->_header = header;
if (request->read(in)) {
// ...
printf("TODO: getmore\n");
delete request;
}
break;
case MONGO_OP_DELETE:
request = new MongoDelete();
request->_header = header;
if (request->read(in)) {
// ...
printf("TODO: delete\n");
delete request;
}
break;
case MONGO_OP_KILL_CURSORS:
request = new MongoKillCursors();
request->_header = header;
if (request->read(in)) {
// ...
printf("TODO: killcursors\n");
delete request;
}
break;
default:
// ???!!!
printf("Unhandled message: %i\n", header.opCode);
break;
}
}
void onError(const Poco::AutoPtr<Poco::Net::ErrorNotification>& pNf)
{
printf("Connection error\n");
}
void onTimeout(const Poco::AutoPtr<Poco::Net::TimeoutNotification>& pNf)
{
printf("timeout\n");
_socket.shutdownSend();
}
void onIdle(const Poco::AutoPtr<Poco::Net::IdleNotification>& pNf)
{
printf("idle\n");
_socket.shutdownSend();
}
void onShutdown(const AutoPtr<ShutdownNotification>& pNf)
{
printf("SHUTDOWN\n");
delete this;
}
private:
enum
{
BUFFER_SIZE = 1024
};
StreamSocket _socket;
SocketReactor& _reactor;
Buffer<char> _pBuffer;
bool _readingMessage;
int _messageSize;
int _bufferPos;
int _currentRequestId;
MongoMsgHeader _currentMessage;
};
class MongoServer: public Poco::Util::ServerApplication
{
public:
MongoServer(): _helpRequested(false)
{
}
~MongoServer()
{
}
protected:
void initialize(Application& self)
{
loadConfiguration(); // load default configuration files, if present
ServerApplication::initialize(self);
}
void uninitialize()
{
ServerApplication::uninitialize();
}
void defineOptions(OptionSet& options)
{
ServerApplication::defineOptions(options);
options.addOption(
Option("help", "h", "display help information on command line arguments")
.required(false)
.repeatable(false));
}
void handleOption(const std::string& name, const std::string& value)
{
ServerApplication::handleOption(name, value);
if (name == "help")
_helpRequested = true;
}
void displayHelp()
{
HelpFormatter helpFormatter(options());
helpFormatter.setCommand(commandName());
helpFormatter.setUsage("OPTIONS");
helpFormatter.setHeader("An echo server implemented using the Reactor and Acceptor patterns.");
helpFormatter.format(std::cout);
}
int main(const std::vector<std::string>& args)
{
if (_helpRequested)
{
displayHelp();
}
else
{
// get parameters from configuration file
unsigned short port = (unsigned short) config().getInt("MongoServer.port", 27017);
// set-up a server socket
ServerSocket svs(port);
// set-up a SocketReactor...
SocketReactor reactor;
// ... and a SocketAcceptor
SocketAcceptor<MongoServiceHandler> acceptor(svs, reactor);
// run the reactor in its own thread so that we can wait for
// a termination request
Thread thread;
thread.start(reactor);
// wait for CTRL-C or kill
waitForTerminationRequest();
// Stop the SocketReactor
reactor.stop();
thread.join();
}
return Application::EXIT_OK;
}
private:
bool _helpRequested;
};
int main(int argc, char** argv)
{
MongoServer app;
return app.run(argc, argv);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment