Created
June 18, 2012 21:27
-
-
Save shrijeet/2950832 to your computer and use it in GitHub Desktop.
Asynchbase rpc timeout first cut
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
From 2fd88425c5848059fbefc7f85ce14858bbbe7775 Mon Sep 17 00:00:00 2001 | |
From: Shrijeet Paliwal <[email protected]> | |
Date: Mon, 18 Jun 2012 14:25:21 -0700 | |
Subject: [PATCH] Support client RPC operation level timeout | |
--- | |
Makefile | 1 + | |
src/GetRequest.java | 24 ++++++++++++++++++++++-- | |
src/HBaseClient.java | 16 ++++++++++++++++ | |
src/HBaseRpc.java | 24 ++++++++++++++++++++++++ | |
src/RegionClient.java | 10 ++++++++++ | |
src/RpcTimeoutException.java | 18 ++++++++++++++++++ | |
6 files changed, 91 insertions(+), 2 deletions(-) | |
create mode 100644 src/RpcTimeoutException.java | |
diff --git a/Makefile b/Makefile | |
index 99d9cb5..f9eebe6 100644 | |
--- a/Makefile | |
+++ b/Makefile | |
@@ -65,6 +65,7 @@ asynchbase_SOURCES := \ | |
src/RemoteException.java \ | |
src/RowLock.java \ | |
src/RowLockRequest.java \ | |
+ src/RpcTimeoutException.java \ | |
src/Scanner.java \ | |
src/SingletonList.java \ | |
src/TableNotFoundException.java \ | |
diff --git a/src/GetRequest.java b/src/GetRequest.java | |
index 1f15a5d..ee3d1ee 100644 | |
--- a/src/GetRequest.java | |
+++ b/src/GetRequest.java | |
@@ -39,7 +39,8 @@ import org.jboss.netty.buffer.ChannelBuffer; | |
*/ | |
public final class GetRequest extends HBaseRpc | |
implements HBaseRpc.HasTable, HBaseRpc.HasKey, | |
- HBaseRpc.HasFamily, HBaseRpc.HasQualifiers { | |
+ HBaseRpc.HasFamily, HBaseRpc.HasQualifiers, | |
+ HBaseRpc.SupportsRpcTimeout { | |
private static final byte[] GET = new byte[] { 'g', 'e', 't' }; | |
private static final byte[] EXISTS = | |
@@ -48,15 +49,29 @@ public final class GetRequest extends HBaseRpc | |
private byte[] family; // TODO(tsuna): Handle multiple families? | |
private byte[][] qualifiers; | |
private long lockid = RowLock.NO_LOCK; | |
+ private int rpctimeout = 0; | |
/** | |
* Constructor. | |
* <strong>These byte arrays will NOT be copied.</strong> | |
* @param table The non-empty name of the table to use. | |
* @param key The row key to get in that table. | |
+ * @param rpctimeout The RPC timeout in milliseconds for this get request. | |
+ * Value of 0 means RPC timeout is disabled. | |
*/ | |
- public GetRequest(final byte[] table, final byte[] key) { | |
+ public GetRequest(final byte[] table, final byte[] key, int rpctimeout) { | |
super(GET, table, key); | |
+ this.rpctimeout = rpctimeout; | |
+ } | |
+ | |
+ /** | |
+ * Constructor. | |
+ * <strong>These byte arrays will NOT be copied.</strong> | |
+ * @param table The non-empty name of the table to use. | |
+ * @param key The row key to get in that table. | |
+ */ | |
+ public GetRequest(final byte[] table, final byte[] key) { | |
+ this(table, key, 0); | |
} | |
/** | |
@@ -182,6 +197,11 @@ public final class GetRequest extends HBaseRpc | |
return qualifiers; | |
} | |
+ @Override | |
+ public int rpctimeout() { | |
+ return rpctimeout; | |
+ } | |
+ | |
public String toString() { | |
final String klass = method() == GET ? "GetRequest" : "Exists"; | |
return super.toStringWithQualifiers(klass, family, qualifiers); | |
diff --git a/src/HBaseClient.java b/src/HBaseClient.java | |
index 2a85755..c56b2ca 100644 | |
--- a/src/HBaseClient.java | |
+++ b/src/HBaseClient.java | |
@@ -1053,6 +1053,20 @@ public final class HBaseClient { | |
if (client != null && client.isAlive()) { | |
request.setRegion(region); | |
final Deferred<Object> d = request.getDeferred(); | |
+ if (request instanceof HBaseRpc.SupportsRpcTimeout | |
+ && ((HBaseRpc.SupportsRpcTimeout) request).rpctimeout() != 0) { | |
+ final class RpcTimer implements TimerTask { | |
+ public void run(final Timeout timeout) { | |
+ request.callback(new RpcTimeoutException(request)); | |
+ } | |
+ public String toString() { | |
+ return "RPC timer for " + request; | |
+ } | |
+ }; | |
+ request.rpctimeout = timer.newTimeout(new RpcTimer(), | |
+ ((HBaseRpc.SupportsRpcTimeout) request).rpctimeout(), | |
+ MILLISECONDS); | |
+ } | |
client.sendRpc(request); | |
return d; | |
} | |
@@ -1060,6 +1074,8 @@ public final class HBaseClient { | |
return locateRegion(table, key).addBothDeferring(new RetryRpc()); | |
} | |
+ | |
+ | |
/** | |
* Returns how many lookups in {@code -ROOT-} were performed. | |
* <p> | |
diff --git a/src/HBaseRpc.java b/src/HBaseRpc.java | |
index dd5654c..35ae91f 100644 | |
--- a/src/HBaseRpc.java | |
+++ b/src/HBaseRpc.java | |
@@ -29,6 +29,7 @@ package org.hbase.async; | |
import org.jboss.netty.buffer.ChannelBuffer; | |
import org.jboss.netty.buffer.ChannelBuffers; | |
import org.jboss.netty.util.CharsetUtil; | |
+import org.jboss.netty.util.Timeout; | |
import com.stumbleupon.async.Deferred; | |
@@ -136,6 +137,15 @@ public abstract class HBaseRpc { | |
public long timestamp(); | |
} | |
+ /** | |
+ * An RPC that supports RPC timeout | |
+ * @since 1.3.1 | |
+ */ | |
+ public interface SupportsRpcTimeout { | |
+ /** Returns the RPC timeout in milliseconds to be applied to this RPC. */ | |
+ public int rpctimeout(); | |
+ } | |
+ | |
/* | |
* This class, although it's part of the public API, is mostly here to make | |
* it easier for this library to manipulate the HBase RPC protocol. | |
@@ -348,6 +358,11 @@ public abstract class HBaseRpc { | |
byte attempt; // package-private for RegionClient and HBaseClient only. | |
/** | |
+ * RPC timeout, null if this RPC does not support RPC timeouts. | |
+ */ | |
+ Timeout rpctimeout; | |
+ | |
+ /** | |
* Package private constructor for RPCs that aren't for any region. | |
* @param method The name of the method to invoke on the RegionServer. | |
*/ | |
@@ -430,6 +445,15 @@ public abstract class HBaseRpc { | |
d.callback(result); | |
} | |
+ /* | |
+ * Cancels the RPC timeout. | |
+ */ | |
+ final void cancelRpcTimout() { | |
+ if (rpctimeout != null) { | |
+ rpctimeout.cancel(); | |
+ } | |
+ } | |
+ | |
/** Checks whether or not this RPC has a Deferred without creating one. */ | |
final boolean hasDeferred() { | |
return deferred != null; | |
diff --git a/src/RegionClient.java b/src/RegionClient.java | |
index 5b21653..d36b6f3 100644 | |
--- a/src/RegionClient.java | |
+++ b/src/RegionClient.java | |
@@ -1109,6 +1109,16 @@ final class RegionClient extends ReplayingDecoder<VoidEnum> { | |
} | |
try { | |
+ if (rpc instanceof HBaseRpc.SupportsRpcTimeout) { | |
+ // In case timeout has expired, the response should just be silently | |
+ // ignored as no processing / handling of it can occur, because we | |
+ // already gave a RpcTimeoutException to the Deferred. | |
+ if (rpc.rpctimeout.isExpired()) { | |
+ return null; | |
+ } else { | |
+ rpc.cancelRpcTimout(); | |
+ } | |
+ } | |
rpc.callback(decoded); | |
} catch (Exception e) { | |
LOG.error("Unexpected exception while handling RPC #" + rpcid | |
diff --git a/src/RpcTimeoutException.java b/src/RpcTimeoutException.java | |
new file mode 100644 | |
index 0000000..db9dd6b | |
--- /dev/null | |
+++ b/src/RpcTimeoutException.java | |
@@ -0,0 +1,18 @@ | |
+package org.hbase.async; | |
+ | |
+public class RpcTimeoutException extends RecoverableException { | |
+ | |
+ final HBaseRpc timedout_rpc; | |
+ | |
+ RpcTimeoutException(HBaseRpc rpc) { | |
+ super("RPC Timed out " + rpc); | |
+ this.timedout_rpc = rpc; | |
+ } | |
+ | |
+ public HBaseRpc getTimedOutRpc() { | |
+ return timedout_rpc; | |
+ } | |
+ | |
+ private static final long serialVersionUID = -4253496236526944416L; | |
+ | |
+} | |
-- | |
1.7.7.5 (Apple Git-26) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment