Skip to content

Instantly share code, notes, and snippets.

@toddlipcon
Created June 9, 2011 05:59
Show Gist options
  • Select an option

  • Save toddlipcon/1016144 to your computer and use it in GitHub Desktop.

Select an option

Save toddlipcon/1016144 to your computer and use it in GitHub Desktop.
diff --git a/src/java/org/apache/hadoop/hdfs/BlockReader.java b/src/java/org/apache/hadoop/hdfs/BlockReader.java
index 9adedef..ff85af5 100644
--- a/src/java/org/apache/hadoop/hdfs/BlockReader.java
+++ b/src/java/org/apache/hadoop/hdfs/BlockReader.java
@@ -95,6 +95,7 @@ public class BlockReader extends FSInputChecker {
private final long bytesNeededToFinish;
private boolean eos = false;
+ private boolean sentStatusCode = false;
byte[] skipBuf = null;
ByteBuffer checksumBytes = null;
@@ -483,11 +484,11 @@ public class BlockReader extends FSInputChecker {
}
/**
- * Whether the BlockReader has reached the end of its input stream, i.e.
- * whether it has consumed all the data sent by the DN.
+ * Whether the BlockReader has reached the end of its input stream
+ * and successfully sent a status code back to the datanode.
*/
- public boolean hasConsumedAll() {
- return eos;
+ public boolean hasSentStatusCode() {
+ return sentStatusCode;
}
/**
@@ -497,10 +498,12 @@ public class BlockReader extends FSInputChecker {
* data correctness.
*/
void sendReadResult(Socket sock, DataTransferProtocol.Status statusCode) {
+ assert !sentStatusCode : "already sent status code to " + sock;
try {
OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
statusCode.writeOutputStream(out);
out.flush();
+ sentStatusCode = true;
} catch (IOException e) {
// It's ok not to be able to send this. But something is probably wrong.
LOG.info("Could not send read status (" + statusCode + ") to datanode " +
diff --git a/src/java/org/apache/hadoop/hdfs/DFSInputStream.java b/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
index f498a4a..57052af 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -713,7 +713,7 @@ public class DFSInputStream extends FSInputStream {
* Close the given BlockReader and cache its socket.
*/
private void closeBlockReader(BlockReader reader) throws IOException {
- if (reader.hasConsumedAll()) {
+ if (reader.hasSentStatusCode()) {
Socket oldSock = reader.takeSocket();
socketCache.put(oldSock);
}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java
index 2d9645c..311b724 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java
@@ -81,7 +81,7 @@ public class TestConnCache {
reader = (BlockReader) invocation.callRealMethod();
if (sock == null) {
sock = reader.dnSock;
- } else if (prevReader != null && prevReader.hasConsumedAll()) {
+ } else if (prevReader != null && prevReader.hasSentStatusCode()) {
// Can't reuse socket if the previous BlockReader didn't read till EOS.
assertSame("DFSInputStream should use the same socket",
sock, reader.dnSock);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment