Created
February 5, 2014 01:39
-
-
Save jamesu/8815991 to your computer and use it in GitHub Desktop.
An experiment in implementing the mongodb server protocol in poco
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright (C) 2013 James S Urquhart. | |
// | |
// Permission is hereby granted, free of charge, to any person obtaining a copy | |
// of this software and associated documentation files (the "Software"), to | |
// deal in the Software without restriction, including without limitation the | |
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or | |
// sell copies of the Software, and to permit persons to whom the Software is | |
// furnished to do so, subject to the following conditions: | |
// | |
// The above copyright notice and this permission notice shall be included in | |
// all copies or substantial portions of the Software. | |
// | |
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING | |
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | |
// IN THE SOFTWARE. | |
//----------------------------------------------------------------------------- | |
// NOTE: compile with "g++ main.cpp -lPocoNet -lPocoUtil -lPocoFoundation -lmongoc -o test -DMONGO_HAVE_STDINT" | |
#include "Poco/Net/SocketReactor.h" | |
#include "Poco/Net/SocketAcceptor.h" | |
#include "Poco/Net/SocketNotification.h" | |
#include "Poco/Net/StreamSocket.h" | |
#include "Poco/Net/ServerSocket.h" | |
#include "Poco/Net/NetException.h" | |
#include "Poco/NObserver.h" | |
#include "Poco/Exception.h" | |
#include "Poco/Buffer.h" | |
#include "Poco/ByteOrder.h" | |
#include "Poco/Thread.h" | |
#include "Poco/MemoryStream.h" | |
#include "Poco/Util/ServerApplication.h" | |
#include "Poco/Util/Option.h" | |
#include "Poco/Util/OptionSet.h" | |
#include "Poco/Util/HelpFormatter.h" | |
#include <iostream> | |
#include <vector> | |
#include "mongo.h" | |
using Poco::Net::SocketReactor; | |
using Poco::Net::SocketAcceptor; | |
using Poco::Net::ReadableNotification; | |
using Poco::Net::ErrorNotification; | |
using Poco::Net::TimeoutNotification; | |
using Poco::Net::IdleNotification; | |
using Poco::Net::ShutdownNotification; | |
using Poco::Net::ServerSocket; | |
using Poco::Net::StreamSocket; | |
using Poco::NObserver; | |
using Poco::AutoPtr; | |
using Poco::Thread; | |
using Poco::Util::ServerApplication; | |
using Poco::Util::Application; | |
using Poco::Util::Option; | |
using Poco::Util::OptionSet; | |
using Poco::Util::HelpFormatter; | |
using Poco::Buffer; | |
using Poco::ByteOrder; | |
using Poco::MemoryInputStream; | |
using Poco::MemoryOutputStream; | |
using Poco::MemoryIOS; | |
#define MONGO_OP_REPLY 1 | |
#define MONGO_OP_OP_MSG 1000 | |
#define MONGO_OP_UPDATE 2001 | |
#define MONGO_OP_INSERT 2002 | |
#define MONGO_OP_RESERVED 2003 | |
#define MONGO_OP_QUERY 2004 | |
#define MONGO_OP_GETMORE 2005 | |
#define MONGO_OP_DELETE 2006 | |
#define MONGO_OP_KILL_CURSORS 2007 | |
typedef int int32; | |
typedef struct MongoMsgHeader { | |
int32 messageLength; // total message size, including this | |
int32 requestID; // identifier for this message | |
int32 responseTo; // requestID from the original request | |
// (used in reponses from db) | |
int32 opCode; // request type - see table below | |
} MongoMsgHeader; | |
// Need our own Memory stream since POCO's is terrible | |
class Stream | |
{ | |
public: | |
Stream() {;} | |
virtual ~Stream() {;} | |
virtual bool _write(int nBytes, const void *data) = 0; | |
virtual bool _read(int nBytes, void *data) = 0; | |
virtual int getPosition() { return 0; } | |
virtual void setPosition(int pos) { ; } | |
virtual bool isEOF() { return true; } | |
virtual int getStreamSize() { return 0; } | |
bool readString(std::string &str) | |
{ | |
char buffer[1024]; | |
char *ptr = buffer; | |
while (_read(1, ptr)) { | |
if (*ptr++ == '\0') | |
break; | |
} | |
str = buffer; | |
} | |
bool writeString(std::string &str) | |
{ | |
int len = str.length(); | |
return _write(len+1, str.c_str()); | |
} | |
template<class T> bool read(T &var) { T rvar; bool ret = _read(sizeof(T), &rvar); var = ByteOrder::toLittleEndian(rvar); return ret; }; | |
template<class T> bool write(T var) { T wvar = ByteOrder::fromLittleEndian(var); return _write(sizeof(T), &wvar); }; | |
}; | |
class MemoryStream : public Stream | |
{ | |
public: | |
int _curSize; | |
int _curPos; | |
char *_data; | |
MemoryStream(char *buffer, int size) : _curSize(size), _curPos(0) | |
{ | |
_data = buffer; | |
} | |
~MemoryStream() | |
{ | |
//delete[] _data; | |
} | |
bool _write(int nBytes, const void *data) | |
{ | |
int bytesLeft = _curSize - _curPos; | |
int bytesToWrite = bytesLeft > nBytes ? nBytes : bytesLeft; | |
if (bytesToWrite > 0) { | |
memcpy(_data + _curPos, data, bytesToWrite); | |
_curPos += bytesToWrite; | |
} | |
return bytesToWrite == nBytes; | |
} | |
bool _read(int nBytes, void *data) | |
{ | |
int bytesLeft = _curSize - _curPos; | |
int bytesToRead = bytesLeft > nBytes ? nBytes : bytesLeft; | |
if (bytesToRead > 0) { | |
memcpy(data, _data + _curPos, bytesToRead); | |
_curPos += bytesToRead; | |
} | |
return bytesToRead == nBytes; | |
} | |
virtual int getPosition() { return _curPos; } | |
virtual void setPosition(int pos) { if (pos >= 0 && pos <= _curSize) _curPos = pos; } | |
virtual bool isEOF() { return _curPos >= _curSize; } | |
virtual int getStreamSize() { return _curSize; } | |
}; | |
class BSONBuilder | |
{ | |
public: | |
bson _data; | |
BSONBuilder() { bson_init(&_data); } | |
~BSONBuilder() { bson_destroy(&_data); } | |
bson *data() { return &_data; } | |
void addField(const char *name, bson_oid_t *value) | |
{ | |
bson_append_oid(&_data, name, value); | |
} | |
void addField(const char *name, int value) | |
{ | |
bson_append_int(&_data, name, value); | |
} | |
void addField(const char *name, double value) | |
{ | |
bson_append_double(&_data, name, value); | |
} | |
void addField(const char *name, int64_t value) | |
{ | |
bson_append_long(&_data, name, value); | |
} | |
void addField(const char *name, const char *value) | |
{ | |
bson_append_string(&_data, name, value); | |
} | |
void addField(const char *name, char type, char *data, int len) | |
{ | |
bson_append_binary(&_data, name, type, data, len); | |
} | |
void addField(const char *name, bool value) | |
{ | |
bson_append_bool(&_data, name, value); | |
} | |
void addNullField(const char *name) | |
{ | |
bson_append_null(&_data, name); | |
} | |
void addUndefinedField(const char *name) | |
{ | |
bson_append_null(&_data, name); | |
} | |
void addCodeField(const char *name, const char *value, const bson *scope) | |
{ | |
if (scope) | |
bson_append_code_w_scope(&_data, name, value, scope); | |
else | |
bson_append_code(&_data, name, value); | |
} | |
void finish() | |
{ | |
bson_finish(&_data); | |
} | |
}; | |
class MongoDocument | |
{ | |
public: | |
bson _data; | |
char *_bytes; | |
MongoDocument() : _bytes(0) | |
{ | |
} | |
~MongoDocument() | |
{ | |
if (_bytes) bson_destroy(&_data); | |
} | |
void setDocument(bson *data) | |
{ | |
bson_init(&_data); | |
bson_copy(&_data, data); | |
_bytes = (char*)bson_data(&_data); | |
} | |
bool read(Stream &in) | |
{ | |
int size = 0; | |
in.read(size); | |
printf("Document size == %i\n", size); | |
_bytes = new char[size+4]; | |
in.setPosition(in.getPosition()-4); | |
in._read(size, _bytes); | |
bson_init_data(&_data, _bytes); | |
return true; | |
} | |
bool write(Stream &out) | |
{ | |
bson_finish(&_data); | |
_bytes = (char*)bson_data(&_data); | |
out._write(bson_size(&_data), _bytes); | |
return true; | |
} | |
}; | |
class MongoRequest | |
{ | |
public: | |
MongoMsgHeader _header; | |
MongoRequest() {_header.opCode = 0; _header.responseTo = 0;} | |
virtual ~MongoRequest() {;} | |
virtual bool read(Stream &in) = 0; | |
virtual bool write(Stream &out) = 0; | |
}; | |
#define MONGO_QUERY_RESERVED 0x1 | |
#define MONGO_QUERY_TAILABLECURSOR 0x2 | |
#define MONGO_QUERY_SLAVEOK 0x4 | |
#define MONGO_QUERY_OPLOGREPLAY 0x8 | |
#define MONGO_QUERY_NOCURSORTIMEOUT 0x10 | |
#define MONGO_QUERY_AWAITDATA 0x20 | |
#define MONGO_QUERY_EXHAUST 0x40 | |
#define MONGO_QUERY_PARTIAL 0x80 | |
class MongoQuery : public MongoRequest | |
{ | |
public: | |
int _flags; | |
std::string _fullCollectionName; | |
int _numberToSkip; | |
int _numberToReturn; | |
MongoDocument *_query; | |
MongoDocument *_returnFieldSelector; | |
MongoQuery() : _flags(0), _numberToSkip(0), _numberToReturn(0), _query(0), _returnFieldSelector(0) | |
{ | |
_header.opCode = MONGO_OP_QUERY; | |
} | |
~MongoQuery() | |
{ | |
if (_query) delete _query; | |
if (_returnFieldSelector) delete _returnFieldSelector; | |
} | |
virtual bool read(Stream &in) | |
{ | |
printf("READ START %i, size == %i\n", in.getPosition(), in.getStreamSize()); | |
in.read(_flags); | |
in.readString(_fullCollectionName); | |
in.read(_numberToSkip); | |
in.read(_numberToReturn); | |
printf("READ READ %i '%s' %i %i @[%i]\n", _flags, _fullCollectionName.c_str(), _numberToSkip, _numberToReturn, in.getPosition()); | |
// Actual query | |
printf("Reading _query...\n"); | |
_query = new MongoDocument(); | |
if (!_query->read(in)) { | |
delete _query; | |
_query = NULL; | |
return false; | |
} | |
// Optional: returnFieldSelector | |
printf("Position is now %i\n", in.getPosition()); | |
if (!in.isEOF()) { | |
printf("Read returnFieldSelector\n"); | |
_returnFieldSelector = new MongoDocument(); | |
if (!_returnFieldSelector->read(in)) { | |
delete _returnFieldSelector; | |
delete _query; | |
_returnFieldSelector = _query = NULL; | |
return false; | |
} | |
} else { | |
_returnFieldSelector = NULL; | |
} | |
return true; | |
} | |
virtual bool write(Stream &out) | |
{ | |
out.write(_flags); | |
out.writeString(_fullCollectionName); | |
out.write(_numberToSkip); | |
out.write(_numberToReturn); | |
// Actual query | |
if (_query == NULL || !_query->write(out)) { | |
return false; | |
} | |
// Optional: returnFieldSelector | |
if (_returnFieldSelector && !_returnFieldSelector->write(out)) { | |
return false; | |
} | |
return true; | |
} | |
}; | |
class MongoInsert : public MongoRequest | |
{ | |
public: | |
int _flags; | |
std::string _fullCollectionName; | |
std::vector<MongoDocument*> _documents; | |
MongoInsert() : _flags(0) | |
{ | |
_header.opCode = MONGO_OP_INSERT; | |
} | |
~MongoInsert() | |
{ | |
for (int i=0; i<_documents.size(); i++) { | |
delete _documents[i]; | |
} | |
} | |
virtual bool read(Stream &in) | |
{ | |
in.read(_flags); | |
in.readString(_fullCollectionName); | |
// Read documents until there are none left | |
while (!in.isEOF()) { | |
MongoDocument *document = new MongoDocument(); | |
if (!document->read(in)) { | |
delete document; | |
break; | |
} | |
_documents.push_back(document); | |
} | |
// No documents read? Fail! | |
if (_documents.size() == 0) | |
return false; | |
return true; | |
} | |
virtual bool write(Stream &out) | |
{ | |
out.write(_flags); | |
out.writeString(_fullCollectionName); | |
// Write all the documents | |
for (int i=0; i<_documents.size(); i++) { | |
_documents[i]->write(out); | |
} | |
return true; | |
} | |
}; | |
#define MONGO_REPLY_CURSORNOTFOUND 0x1 | |
#define MONGO_REPLY_QUERYFAILURE 0x2 | |
#define MONGO_REPLY_SHARDCONFIGSTATE 0x4 | |
#define MONGO_REPLY_AWAITCAPABLE 0x8 | |
class MongoReply : public MongoRequest | |
{ | |
public: | |
int _flags; | |
signed long _cursorID; | |
int _startingFrom; | |
std::vector<MongoDocument*> _documents; | |
MongoReply() : _flags(0), _cursorID(0), _startingFrom(0) | |
{ | |
_header.opCode = MONGO_OP_REPLY; | |
} | |
~MongoReply() | |
{ | |
for (int i=0; i<_documents.size(); i++) { | |
delete _documents[i]; | |
} | |
} | |
void appendDocument(bson *data) | |
{ | |
MongoDocument *document = new MongoDocument(); | |
document->setDocument(data); | |
_documents.push_back(document); | |
} | |
virtual bool read(Stream &in) | |
{ | |
int numDocuments; | |
in.read(_flags); | |
in.read(_cursorID); | |
in.read(_startingFrom); | |
in.read(numDocuments); | |
// Read numDocuments documents | |
_documents.reserve(numDocuments); | |
for (int i=0; i<numDocuments; i++) { | |
MongoDocument *document = new MongoDocument(); | |
if (!document->read(in)) { | |
delete document; | |
continue; | |
} | |
_documents.push_back(document); | |
} | |
// Incorrect number of documents read? Fail! | |
if (numDocuments != _documents.size()) | |
return false; | |
return true; | |
} | |
virtual bool write(Stream &out) | |
{ | |
int numDocuments = _documents.size(); | |
out.write(_flags); | |
out.write(_cursorID); | |
out.write(_startingFrom); | |
out.write(numDocuments); | |
// Write all the documents | |
for (int i=0; i<_documents.size(); i++) { | |
_documents[i]->write(out); | |
} | |
return true; | |
} | |
}; | |
class MongoUpdate : public MongoRequest | |
{ | |
public: | |
int _flags; | |
std::string _fullCollectionName; | |
MongoDocument *_selector; | |
MongoDocument *_update; | |
MongoUpdate() : _flags(0), _selector(0), _update(0) | |
{ | |
_header.opCode = MONGO_OP_UPDATE; | |
} | |
~MongoUpdate() | |
{ | |
if (_selector) delete _selector; | |
if (_update) delete _update; | |
} | |
virtual bool read(Stream &in) | |
{ | |
int zero=0; | |
in.read(zero); | |
in.readString(_fullCollectionName); | |
in.read(_flags); | |
// Actual query | |
_selector = new MongoDocument(); | |
if (!_selector->read(in)) { | |
delete _selector; | |
_selector = NULL; | |
return false; | |
} | |
// Update document | |
_update = new MongoDocument(); | |
if (!_update->read(in)) { | |
delete _selector; | |
delete _update; | |
_selector = _update = NULL; | |
return false; | |
} | |
return true; | |
} | |
virtual bool write(Stream &out) | |
{ | |
int zero=0; | |
if (_selector == NULL || _update == NULL) | |
return false; | |
out.write(zero); | |
out.writeString(_fullCollectionName); | |
out.write(_flags); | |
// Actual query | |
if (!_selector->write(out)) { | |
return false; | |
} | |
// Update document | |
if (!_update->write(out)) { | |
return false; | |
} | |
return true; | |
} | |
}; | |
class MongoMore : public MongoRequest | |
{ | |
public: | |
int _flags; | |
std::string _fullCollectionName; | |
int _numberToReturn; | |
signed long _cursorID; | |
MongoMore() : _flags(0), _numberToReturn(0), _cursorID(0) | |
{ | |
_header.opCode = MONGO_OP_GETMORE; | |
} | |
~MongoMore() | |
{ | |
} | |
virtual bool read(Stream &in) | |
{ | |
int zero=0; | |
in.read(zero); | |
in.readString(_fullCollectionName); | |
in.read(_numberToReturn); | |
in.read(_cursorID); | |
return true; | |
} | |
virtual bool write(Stream &out) | |
{ | |
int zero=0; | |
out.write(zero); | |
out.writeString(_fullCollectionName); | |
out.write(_numberToReturn); | |
out.write(_cursorID); | |
return true; | |
} | |
}; | |
class MongoDelete : public MongoRequest | |
{ | |
public: | |
int _flags; | |
std::string _fullCollectionName; | |
MongoDocument *_selector; | |
MongoDelete() : _flags(0), _selector(0) | |
{ | |
_header.opCode = MONGO_OP_DELETE; | |
} | |
~MongoDelete() | |
{ | |
if (_selector) delete _selector; | |
} | |
virtual bool read(Stream &in) | |
{ | |
int zero=0; | |
in.read(zero); | |
in.readString(_fullCollectionName); | |
in.read(_flags); | |
// Actual query | |
_selector = new MongoDocument(); | |
if (!_selector->read(in)) { | |
delete _selector; | |
_selector = NULL; | |
return false; | |
} | |
return true; | |
} | |
virtual bool write(Stream &out) | |
{ | |
int zero=0; | |
if (_selector == NULL) | |
return false; | |
out.write(zero); | |
out.writeString(_fullCollectionName); | |
out.write(_flags); | |
// Actual query | |
if (!_selector->write(out)) { | |
return false; | |
} | |
return true; | |
} | |
}; | |
class MongoKillCursors : public MongoRequest | |
{ | |
public: | |
std::vector<signed long> _cursors; | |
MongoKillCursors() | |
{ | |
_header.opCode = MONGO_OP_KILL_CURSORS; | |
} | |
~MongoKillCursors() | |
{ | |
} | |
virtual bool read(Stream &in) | |
{ | |
int zero=0; | |
int numCursors=0; | |
in.read(numCursors); | |
_cursors.reserve(numCursors); | |
for (int i=0; i<_cursors.size(); i++) | |
in.read(_cursors[i]); | |
return true; | |
} | |
virtual bool write(Stream &out) | |
{ | |
int zero=0; | |
int numCursors=_cursors.size(); | |
for (int i=0; i<numCursors; i++) | |
out.write(_cursors[i]); | |
return true; | |
} | |
}; | |
class MongoServiceHandler | |
{ | |
public: | |
MongoServiceHandler(StreamSocket& socket, SocketReactor& reactor): | |
_socket(socket), | |
_reactor(reactor), | |
_pBuffer(BUFFER_SIZE), | |
_readingMessage(false), | |
_messageSize(0), | |
_bufferPos(0), | |
_currentRequestId(0) | |
{ | |
Application& app = Application::instance(); | |
app.logger().information("Connection from " + socket.peerAddress().toString()); | |
_reactor.addEventHandler(_socket, NObserver<MongoServiceHandler, ReadableNotification>(*this, &MongoServiceHandler::onReadable)); | |
_reactor.addEventHandler(_socket, NObserver<MongoServiceHandler, ErrorNotification>(*this, &MongoServiceHandler::onError)); | |
_reactor.addEventHandler(_socket, NObserver<MongoServiceHandler, TimeoutNotification>(*this, &MongoServiceHandler::onTimeout)); | |
_reactor.addEventHandler(_socket, NObserver<MongoServiceHandler, IdleNotification>(*this, &MongoServiceHandler::onIdle)); | |
_reactor.addEventHandler(_socket, NObserver<MongoServiceHandler, ShutdownNotification>(*this, &MongoServiceHandler::onShutdown)); | |
} | |
~MongoServiceHandler() | |
{ | |
Application& app = Application::instance(); | |
try | |
{ | |
app.logger().information("Disconnecting " + _socket.peerAddress().toString()); | |
} | |
catch (...) | |
{ | |
} | |
_reactor.removeEventHandler(_socket, NObserver<MongoServiceHandler, ReadableNotification>(*this, &MongoServiceHandler::onReadable)); | |
_reactor.removeEventHandler(_socket, NObserver<MongoServiceHandler, ErrorNotification>(*this, &MongoServiceHandler::onError)); | |
_reactor.removeEventHandler(_socket, NObserver<MongoServiceHandler, TimeoutNotification>(*this, &MongoServiceHandler::onTimeout)); | |
_reactor.removeEventHandler(_socket, NObserver<MongoServiceHandler, IdleNotification>(*this, &MongoServiceHandler::onIdle)); | |
_reactor.removeEventHandler(_socket, NObserver<MongoServiceHandler, ShutdownNotification>(*this, &MongoServiceHandler::onShutdown)); | |
//delete _pBuffer; | |
} | |
int bufferUpBytes() | |
{ | |
char *bufferPTR = _pBuffer.begin() + _bufferPos; | |
int bytesLeft = BUFFER_SIZE-_bufferPos; | |
if (bytesLeft <= 0) { | |
// Expand buffer? | |
return -1; | |
} | |
int n = _socket.receiveBytes(bufferPTR, bytesLeft); | |
printf("receiveBytes(%i) == %i\n", bytesLeft, n); | |
_bufferPos += n; | |
return n; | |
} | |
void onReadable(const AutoPtr<ReadableNotification>& pNf) | |
{ | |
printf("Reading data...\n"); | |
try { | |
int n = bufferUpBytes(); | |
printf("Grabbed %i bytes...\n", n); | |
if (n == 0) { | |
// Disconnected? | |
delete this; | |
return; | |
} | |
// Keep reading data until exhausted | |
while (n != 0) { | |
if (n == -1) { | |
printf("Buffer exhausted\n"); | |
// TODO: handle | |
return; | |
} | |
printf("Reading MORE data...\n"); | |
if (!_readingMessage) { | |
// Wait till we get the message header | |
if (_bufferPos + n >= sizeof(MongoMsgHeader)) { | |
printf("Parsed header\n"); | |
// Parse header | |
memcpy(&_currentMessage, _pBuffer.begin(), sizeof(MongoMsgHeader)); | |
// Convert endian | |
_currentMessage.messageLength = ByteOrder::fromLittleEndian(_currentMessage.messageLength); | |
_currentMessage.requestID = ByteOrder::fromLittleEndian(_currentMessage.requestID); | |
_currentMessage.responseTo = ByteOrder::fromLittleEndian(_currentMessage.responseTo); | |
_currentMessage.opCode = ByteOrder::fromLittleEndian(_currentMessage.opCode); | |
_readingMessage = true; | |
if (_currentMessage.messageLength > BUFFER_SIZE) { | |
printf("Client sent a big message, aborting: %i\n", _currentMessage.messageLength); | |
_socket.shutdownSend(); | |
return; | |
} | |
} | |
} | |
if (_readingMessage) { | |
// Wait until we have read a full message | |
printf("_pos == %i, message size == %i, header size == %i\n", _bufferPos, _currentMessage.messageLength, sizeof(MongoMsgHeader)); | |
if (_bufferPos >= _currentMessage.messageLength) { | |
MemoryStream stream(_pBuffer.begin() + sizeof(MongoMsgHeader), _bufferPos - sizeof(MongoMsgHeader)); | |
onMongoMessage(_currentMessage, stream); | |
// Reset, move extra data to start of buffer | |
_readingMessage = false; | |
memcpy(_pBuffer.begin(), _pBuffer.begin() + _bufferPos, _bufferPos - _currentMessage.messageLength); | |
n = _bufferPos = _bufferPos - _currentMessage.messageLength; | |
continue; | |
} | |
} | |
n = bufferUpBytes(); | |
} | |
} catch(Poco::Net::ConnectionResetException &ex) | |
{ | |
printf("Errm WTF\n"); | |
_socket.shutdownSend(); | |
} | |
} | |
void sendResponse(MongoRequest *response) | |
{ | |
char sendBuffer[4096]; | |
MemoryStream stream(sendBuffer, 4096); | |
MongoMsgHeader header = response->_header; | |
response->write(stream); | |
int size = stream.getPosition(); | |
header.messageLength = ByteOrder::toLittleEndian(sizeof(MongoMsgHeader) + size); | |
header.requestID = ByteOrder::toLittleEndian(_currentRequestId++); | |
header.responseTo = ByteOrder::toLittleEndian(header.responseTo); | |
header.opCode = ByteOrder::toLittleEndian(header.opCode); | |
printf("Writing response header\n"); | |
_socket.sendBytes(&header, sizeof(MongoMsgHeader)); | |
printf("Writing response of %i bytes\n", size); | |
_socket.sendBytes(sendBuffer, size); | |
} | |
void onMongoMessage(MongoMsgHeader &header, Stream &in) | |
{ | |
MongoRequest *request = NULL; | |
printf("Mongo message: %i\n", header.opCode); | |
switch (header.opCode) { | |
case MONGO_OP_OP_MSG: | |
break; | |
case MONGO_OP_UPDATE: | |
request = new MongoUpdate(); | |
request->_header = header; | |
if (request->read(in)) { | |
// ... | |
printf("TODO: update\n"); | |
delete request; | |
} | |
break; | |
case MONGO_OP_INSERT: | |
request = new MongoInsert(); | |
request->_header = header; | |
if (request->read(in)) { | |
// ... | |
printf("TODO: insert\n"); | |
delete request; | |
} | |
break; | |
case MONGO_OP_QUERY: | |
printf("Construcing MongoQuery\n"); | |
request = new MongoQuery(); | |
request->_header = header; | |
if (request->read(in)) { | |
// ... | |
printf("TODO: Query...\n"); | |
MongoQuery *query = static_cast<MongoQuery*>(request); | |
printf("Query: %x. Doc: %x, Data: %x\n", query, query->_query, query->_query->_data); | |
bson_print(&query->_query->_data); | |
if (query->_returnFieldSelector) bson_print(&query->_returnFieldSelector->_data); | |
delete request; | |
// Create response | |
BSONBuilder doc; | |
doc.addField("$err", "Not implemented"); | |
doc.finish(); | |
MongoReply *reply = new MongoReply(); | |
reply->_flags = MONGO_REPLY_QUERYFAILURE; | |
reply->appendDocument(doc.data()); | |
sendResponse(reply); | |
delete reply; | |
printf("Next...\n"); | |
} | |
break; | |
case MONGO_OP_GETMORE: | |
request = new MongoMore(); | |
request->_header = header; | |
if (request->read(in)) { | |
// ... | |
printf("TODO: getmore\n"); | |
delete request; | |
} | |
break; | |
case MONGO_OP_DELETE: | |
request = new MongoDelete(); | |
request->_header = header; | |
if (request->read(in)) { | |
// ... | |
printf("TODO: delete\n"); | |
delete request; | |
} | |
break; | |
case MONGO_OP_KILL_CURSORS: | |
request = new MongoKillCursors(); | |
request->_header = header; | |
if (request->read(in)) { | |
// ... | |
printf("TODO: killcursors\n"); | |
delete request; | |
} | |
break; | |
default: | |
// ???!!! | |
printf("Unhandled message: %i\n", header.opCode); | |
break; | |
} | |
} | |
void onError(const Poco::AutoPtr<Poco::Net::ErrorNotification>& pNf) | |
{ | |
printf("Connection error\n"); | |
} | |
void onTimeout(const Poco::AutoPtr<Poco::Net::TimeoutNotification>& pNf) | |
{ | |
printf("timeout\n"); | |
_socket.shutdownSend(); | |
} | |
void onIdle(const Poco::AutoPtr<Poco::Net::IdleNotification>& pNf) | |
{ | |
printf("idle\n"); | |
_socket.shutdownSend(); | |
} | |
void onShutdown(const AutoPtr<ShutdownNotification>& pNf) | |
{ | |
printf("SHUTDOWN\n"); | |
delete this; | |
} | |
private: | |
enum | |
{ | |
BUFFER_SIZE = 1024 | |
}; | |
StreamSocket _socket; | |
SocketReactor& _reactor; | |
Buffer<char> _pBuffer; | |
bool _readingMessage; | |
int _messageSize; | |
int _bufferPos; | |
int _currentRequestId; | |
MongoMsgHeader _currentMessage; | |
}; | |
class MongoServer: public Poco::Util::ServerApplication | |
{ | |
public: | |
MongoServer(): _helpRequested(false) | |
{ | |
} | |
~MongoServer() | |
{ | |
} | |
protected: | |
void initialize(Application& self) | |
{ | |
loadConfiguration(); // load default configuration files, if present | |
ServerApplication::initialize(self); | |
} | |
void uninitialize() | |
{ | |
ServerApplication::uninitialize(); | |
} | |
void defineOptions(OptionSet& options) | |
{ | |
ServerApplication::defineOptions(options); | |
options.addOption( | |
Option("help", "h", "display help information on command line arguments") | |
.required(false) | |
.repeatable(false)); | |
} | |
void handleOption(const std::string& name, const std::string& value) | |
{ | |
ServerApplication::handleOption(name, value); | |
if (name == "help") | |
_helpRequested = true; | |
} | |
void displayHelp() | |
{ | |
HelpFormatter helpFormatter(options()); | |
helpFormatter.setCommand(commandName()); | |
helpFormatter.setUsage("OPTIONS"); | |
helpFormatter.setHeader("An echo server implemented using the Reactor and Acceptor patterns."); | |
helpFormatter.format(std::cout); | |
} | |
int main(const std::vector<std::string>& args) | |
{ | |
if (_helpRequested) | |
{ | |
displayHelp(); | |
} | |
else | |
{ | |
// get parameters from configuration file | |
unsigned short port = (unsigned short) config().getInt("MongoServer.port", 27017); | |
// set-up a server socket | |
ServerSocket svs(port); | |
// set-up a SocketReactor... | |
SocketReactor reactor; | |
// ... and a SocketAcceptor | |
SocketAcceptor<MongoServiceHandler> acceptor(svs, reactor); | |
// run the reactor in its own thread so that we can wait for | |
// a termination request | |
Thread thread; | |
thread.start(reactor); | |
// wait for CTRL-C or kill | |
waitForTerminationRequest(); | |
// Stop the SocketReactor | |
reactor.stop(); | |
thread.join(); | |
} | |
return Application::EXIT_OK; | |
} | |
private: | |
bool _helpRequested; | |
}; | |
int main(int argc, char** argv) | |
{ | |
MongoServer app; | |
return app.run(argc, argv); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment