Created
February 15, 2012 12:23
-
-
Save tinnefeld/1835351 to your computer and use it in GitHub Desktop.
Increment Operator for RAMCloud Patch file for 7d19140
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
From fa6b3028135f535393ea67ca4b6db9bec8bc48d0 Mon Sep 17 00:00:00 2001 | |
From: Christian Tinnefeld <[email protected]> | |
Date: Tue, 14 Feb 2012 12:17:02 -0500 | |
Subject: [PATCH] Added increment operator | |
--- | |
GNUmakefile | 2 +- | |
scripts/rawmetrics.py | 1 + | |
src/MasterClient.cc | 30 ++++++++++++++++++++++++ | |
src/MasterClient.h | 3 ++ | |
src/MasterService.cc | 60 +++++++++++++++++++++++++++++++++++++++++++++++++ | |
src/MasterService.h | 3 ++ | |
src/RamCloud.cc | 10 ++++++++ | |
src/RamCloud.h | 3 ++ | |
src/Rpc.cc | 1 + | |
src/Rpc.h | 21 ++++++++++++++++- | |
src/RpcTest.cc | 2 +- | |
src/ServerMain.cc | 2 +- | |
12 files changed, 134 insertions(+), 4 deletions(-) | |
diff --git a/GNUmakefile b/GNUmakefile | |
index 21b1435..c090711 100644 | |
--- a/GNUmakefile | |
+++ b/GNUmakefile | |
@@ -83,7 +83,7 @@ DOXYGEN ?= doxygen | |
# Test whether Infiniband support is available. Avoids using $(COMFLAGS) | |
# (particularly, -MD) which results in bad interactions with mergedeps. | |
-INFINIBAND = $(shell $(CXX) $(INCLUDES) $(EXTRACXXFLAGS) $(LIBS) -libverbs \ | |
+INFINIBAND = no #$(shell $(CXX) $(INCLUDES) $(EXTRACXXFLAGS) $(LIBS) -libverbs \ | |
-o /dev/null src/HaveInfiniband.cc \ | |
>/dev/null 2>&1 \ | |
&& echo yes || echo no) | |
diff --git a/scripts/rawmetrics.py b/scripts/rawmetrics.py | |
index 3bf23ce..5ceee9f 100755 | |
--- a/scripts/rawmetrics.py | |
+++ b/scripts/rawmetrics.py | |
@@ -327,6 +327,7 @@ rpc.metric('openTableTicks', 'time spent executing OPEN_TABLE RPC') | |
rpc.metric('dropTableTicks', 'time spent executing DROP_TABLE RPC') | |
rpc.metric('createTicks', 'time spent executing CREATE RPC') | |
rpc.metric('readTicks', 'time spent executing READ RPC') | |
+rpc.metric('incrementTicks', 'time spent executing INCREMENT RPC') | |
rpc.metric('writeTicks', 'time spent executing WRITE RPC') | |
rpc.metric('removeTicks', 'time spent executing REMOVE RPC') | |
rpc.metric('enlistServerTicks', 'time spent executing ENLIST_SERVER RPC') | |
diff --git a/src/MasterClient.cc b/src/MasterClient.cc | |
index c2d162c..acf93c0 100644 | |
--- a/src/MasterClient.cc | |
+++ b/src/MasterClient.cc | |
@@ -318,6 +318,36 @@ MasterClient::read(uint32_t tableId, uint64_t id, Buffer* value, | |
Read(*this, tableId, id, value, rejectRules, version)(); | |
} | |
+void | |
+MasterClient::increment(uint32_t tableId, uint64_t id, Buffer* value, | |
+ const RejectRules* rejectRules, uint64_t* version) | |
+{ | |
+ Buffer req; | |
+ | |
+ IncrementRpc::Request& reqHdr(allocHeader<IncrementRpc>(req)); | |
+ reqHdr.tableId = tableId; | |
+ reqHdr.id = id; | |
+ reqHdr.rejectRules = rejectRules ? *rejectRules : defaultRejectRules; | |
+ | |
+ value->reset(); | |
+ | |
+ AsyncState state = send<IncrementRpc>(session, | |
+ req, | |
+ *value); | |
+ | |
+ const IncrementRpc::Response& respHdr(recv<IncrementRpc>(state)); | |
+ if (version != NULL) | |
+ *version = respHdr.version; | |
+ | |
+ value->truncateFront(sizeof(respHdr)); | |
+ assert(respHdr.length == value->getTotalLength()); | |
+ | |
+ checkStatus(HERE); | |
+} | |
+ | |
+ | |
+ | |
+ | |
/** | |
* Read the current contents of multiple objects. | |
* | |
diff --git a/src/MasterClient.h b/src/MasterClient.h | |
index 358ddba..73d409b 100644 | |
--- a/src/MasterClient.h | |
+++ b/src/MasterClient.h | |
@@ -196,6 +196,9 @@ class MasterClient : public Client { | |
uint64_t create(uint32_t tableId, const void* buf, uint32_t length, | |
uint64_t* version = NULL, bool async = false); | |
void fillWithTestData(uint32_t numObjects, uint32_t objectSize); | |
+ void increment(uint32_t tableId, uint64_t id, Buffer* value, | |
+ const RejectRules* rejectRules = NULL, | |
+ uint64_t* version = NULL); | |
void multiRead(std::vector<ReadObject*> requests); | |
void read(uint32_t tableId, uint64_t id, Buffer* value, | |
const RejectRules* rejectRules = NULL, | |
diff --git a/src/MasterService.cc b/src/MasterService.cc | |
index 6c11f0c..8ab9770 100644 | |
--- a/src/MasterService.cc | |
+++ b/src/MasterService.cc | |
@@ -153,6 +153,10 @@ MasterService::dispatch(RpcOpcode opcode, Rpc& rpc) | |
callHandler<FillWithTestDataRpc, MasterService, | |
&MasterService::fillWithTestData>(rpc); | |
break; | |
+ case IncrementRpc::opcode: | |
+ callHandler<IncrementRpc, MasterService, | |
+ &MasterService::increment>(rpc); | |
+ break; | |
case MultiReadRpc::opcode: | |
callHandler<MultiReadRpc, MasterService, | |
&MasterService::multiRead>(rpc); | |
@@ -286,6 +290,62 @@ MasterService::fillWithTestData(const FillWithTestDataRpc::Request& reqHdr, | |
} | |
/** | |
+ * Top-level server method to handle the INCREMENT request. | |
+ * \copydetails create | |
+ */ | |
+void | |
+MasterService::increment(const IncrementRpc::Request& reqHdr, | |
+ IncrementRpc::Response& respHdr, | |
+ Rpc& rpc) | |
+{ | |
+ // We must return table doesn't exist if the table does not exist. Also, we | |
+ // might have an entry in the hash table that's invalid because its tablet | |
+ // no longer lives here. | |
+ if (getTable(reqHdr.tableId, reqHdr.id) == NULL) { | |
+ respHdr.common.status = STATUS_TABLE_DOESNT_EXIST; | |
+ return; | |
+ } | |
+ | |
+ LogEntryHandle handle = objectMap.lookup(reqHdr.tableId, reqHdr.id); | |
+ if (handle == NULL || handle->type() != LOG_ENTRY_TYPE_OBJ) { | |
+ respHdr.common.status = STATUS_OBJECT_DOESNT_EXIST; | |
+ return; | |
+ } | |
+ | |
+ const Object* obj = handle->userData<Object>(); | |
+ respHdr.version = obj->version; | |
+ Status status = rejectOperation(reqHdr.rejectRules, obj->version); | |
+ if (status != STATUS_OK) { | |
+ respHdr.common.status = status; | |
+ return; | |
+ } | |
+ | |
+ uint64_t value = 0; | |
+ if (obj->dataLength(handle->length())==sizeof(int64_t)) { | |
+ | |
+ value = *((int64_t*)obj->data); | |
+ value++; | |
+ Buffer tmpBuffer; | |
+ Buffer::Chunk::appendToBuffer(&tmpBuffer, &value, sizeof(int64_t)); | |
+ | |
+ Status status = storeData(reqHdr.tableId, reqHdr.id, &reqHdr.rejectRules, | |
+ &tmpBuffer, 0, | |
+ static_cast<uint32_t>(sizeof(int64_t)), | |
+ &respHdr.version, true); | |
+ if (status != STATUS_OK) { | |
+ respHdr.common.status = status; | |
+ return; | |
+ } | |
+ } | |
+ | |
+ Buffer::Chunk::appendToBuffer(&rpc.replyPayload, | |
+ &value, sizeof(uint64_t)); | |
+ // TODO(ongaro): We'll need a new type of Chunk to block the cleaner | |
+ // from scribbling over obj->data. | |
+ respHdr.length = obj->dataLength(handle->length()); | |
+} | |
+ | |
+/** | |
* Top-level server method to handle the MULTIREAD request. | |
* | |
* \copydetails Service::ping | |
diff --git a/src/MasterService.h b/src/MasterService.h | |
index 2cb5232..f93d190 100644 | |
--- a/src/MasterService.h | |
+++ b/src/MasterService.h | |
@@ -91,6 +91,9 @@ class MasterService : public Service { | |
void fillWithTestData(const FillWithTestDataRpc::Request& reqHdr, | |
FillWithTestDataRpc::Response& respHdr, | |
Rpc& rpc); | |
+ void increment(const IncrementRpc::Request& reqHdr, | |
+ IncrementRpc::Response& respHdr, | |
+ Rpc& rpc); | |
void multiRead(const MultiReadRpc::Request& reqHdr, | |
MultiReadRpc::Response& respHdr, | |
Rpc& rpc); | |
diff --git a/src/RamCloud.cc b/src/RamCloud.cc | |
index 9b46f92..1493fc5 100644 | |
--- a/src/RamCloud.cc | |
+++ b/src/RamCloud.cc | |
@@ -213,6 +213,16 @@ RamCloud::read(uint32_t tableId, uint64_t id, Buffer* value, | |
return Read(*this, tableId, id, value, rejectRules, version)(); | |
} | |
+/// \copydoc MasterClient::read | |
+void | |
+RamCloud::increment(uint32_t tableId, uint64_t id, Buffer* value, | |
+ const RejectRules* rejectRules, uint64_t* version) | |
+{ | |
+ Context::Guard _(clientContext); | |
+ MasterClient master(objectFinder.lookupHead(tableId)); | |
+ master.increment(tableId, id, value, rejectRules, version); | |
+} | |
+ | |
/** | |
* Read the current contents of multiple objects. | |
* | |
diff --git a/src/RamCloud.h b/src/RamCloud.h | |
index db2649f..0e1af99 100644 | |
--- a/src/RamCloud.h | |
+++ b/src/RamCloud.h | |
@@ -178,6 +178,9 @@ class RamCloud { | |
void read(uint32_t tableId, uint64_t id, Buffer* value, | |
const RejectRules* rejectRules = NULL, | |
uint64_t* version = NULL); | |
+ void increment(uint32_t tableId, uint64_t id, Buffer* value, | |
+ const RejectRules* rejectRules = NULL, | |
+ uint64_t* version = NULL); | |
void multiRead(MasterClient::ReadObject* requests[], uint32_t numRequests); | |
void remove(uint32_t tableId, uint64_t id, | |
const RejectRules* rejectRules = NULL, | |
diff --git a/src/Rpc.cc b/src/Rpc.cc | |
index f5a93bd..6f7cade 100644 | |
--- a/src/Rpc.cc | |
+++ b/src/Rpc.cc | |
@@ -84,6 +84,7 @@ Rpc::opcodeSymbol(uint32_t opcode) | |
case REQUEST_SERVER_LIST: return "REQUEST_SERVER_LIST"; | |
case GET_SERVER_ID: return "GET_SERVER_ID"; | |
case ILLEGAL_RPC_TYPE: return "ILLEGAL_RPC_TYPE"; | |
+ case INCREMENT: return "INCREMENT"; | |
} | |
// Never heard of this RPC; return the numeric value. The shared buffer | |
diff --git a/src/Rpc.h b/src/Rpc.h | |
index 2d331d6..59198f9 100644 | |
--- a/src/Rpc.h | |
+++ b/src/Rpc.h | |
@@ -96,7 +96,8 @@ enum RpcOpcode { | |
UPDATE_SERVER_LIST = 38, | |
REQUEST_SERVER_LIST = 39, | |
GET_SERVER_ID = 40, | |
- ILLEGAL_RPC_TYPE = 41, // 1 + the highest legitimate RpcOpcode | |
+ INCREMENT = 41, | |
+ ILLEGAL_RPC_TYPE = 42, // 1 + the highest legitimate RpcOpcode | |
}; | |
/** | |
@@ -161,6 +162,24 @@ struct FillWithTestDataRpc { | |
} __attribute__((packed)); | |
}; | |
+struct IncrementRpc { | |
+ static const RpcOpcode opcode = INCREMENT; | |
+ static const ServiceType service = MASTER_SERVICE; | |
+ struct Request { | |
+ RpcRequestCommon common; | |
+ uint32_t tableId; | |
+ uint64_t id; | |
+ RejectRules rejectRules; | |
+ } __attribute__((packed)); | |
+ struct Response { | |
+ RpcResponseCommon common; | |
+ uint64_t version; | |
+ uint32_t length; // Length of the object's value in bytes. | |
+ // The actual bytes of the object follow | |
+ // immediately after this header. | |
+ } __attribute__((packed)); | |
+}; | |
+ | |
struct MultiReadRpc { | |
static const RpcOpcode opcode = MULTI_READ; | |
static const ServiceType service = MASTER_SERVICE; | |
diff --git a/src/RpcTest.cc b/src/RpcTest.cc | |
index 7bb3716..87325cc 100644 | |
--- a/src/RpcTest.cc | |
+++ b/src/RpcTest.cc | |
@@ -44,7 +44,7 @@ TEST_F(RpcTest, opcodeSymbol_integer) { | |
EXPECT_STREQ("ILLEGAL_RPC_TYPE", Rpc::opcodeSymbol(ILLEGAL_RPC_TYPE)); | |
// Test out-of-range values. | |
- EXPECT_STREQ("unknown(42)", Rpc::opcodeSymbol(ILLEGAL_RPC_TYPE+1)); | |
+ EXPECT_STREQ("unknown(43)", Rpc::opcodeSymbol(ILLEGAL_RPC_TYPE+1)); | |
// Make sure the next-to-last value is defined (this will fail if | |
// someone adds a new opcode and doesn't update opcodeSymbol). | |
diff --git a/src/ServerMain.cc b/src/ServerMain.cc | |
index d71ced5..177ff69 100644 | |
--- a/src/ServerMain.cc | |
+++ b/src/ServerMain.cc | |
@@ -118,7 +118,7 @@ main(int argc, char *argv[]) | |
const string localLocator = optionParser.options.getLocalLocator(); | |
- InfRcTransport<>::setName(localLocator.c_str()); | |
+ //InfRcTransport<>::setName(localLocator.c_str()); | |
Context::get().transportManager->setTimeout( | |
optionParser.options.getTransportTimeout()); | |
Context::get().transportManager->initialize(localLocator.c_str()); | |
-- | |
1.7.3.2 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment