Created
June 9, 2011 05:59
-
-
Save toddlipcon/1016144 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/src/java/org/apache/hadoop/hdfs/BlockReader.java b/src/java/org/apache/hadoop/hdfs/BlockReader.java | |
| index 9adedef..ff85af5 100644 | |
| --- a/src/java/org/apache/hadoop/hdfs/BlockReader.java | |
| +++ b/src/java/org/apache/hadoop/hdfs/BlockReader.java | |
| @@ -95,6 +95,7 @@ public class BlockReader extends FSInputChecker { | |
| private final long bytesNeededToFinish; | |
| private boolean eos = false; | |
| + private boolean sentStatusCode = false; | |
| byte[] skipBuf = null; | |
| ByteBuffer checksumBytes = null; | |
| @@ -483,11 +484,11 @@ public class BlockReader extends FSInputChecker { | |
| } | |
| /** | |
| - * Whether the BlockReader has reached the end of its input stream, i.e. | |
| - * whether it has consumed all the data sent by the DN. | |
| + * Whether the BlockReader has reached the end of its input stream | |
| + * and successfully sent a status code back to the datanode. | |
| */ | |
| - public boolean hasConsumedAll() { | |
| - return eos; | |
| + public boolean hasSentStatusCode() { | |
| + return sentStatusCode; | |
| } | |
| /** | |
| @@ -497,10 +498,12 @@ public class BlockReader extends FSInputChecker { | |
| * data correctness. | |
| */ | |
| void sendReadResult(Socket sock, DataTransferProtocol.Status statusCode) { | |
| + assert !sentStatusCode : "already sent status code to " + sock; | |
| try { | |
| OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT); | |
| statusCode.writeOutputStream(out); | |
| out.flush(); | |
| + sentStatusCode = true; | |
| } catch (IOException e) { | |
| // It's ok not to be able to send this. But something is probably wrong. | |
| LOG.info("Could not send read status (" + statusCode + ") to datanode " + | |
| diff --git a/src/java/org/apache/hadoop/hdfs/DFSInputStream.java b/src/java/org/apache/hadoop/hdfs/DFSInputStream.java | |
| index f498a4a..57052af 100644 | |
| --- a/src/java/org/apache/hadoop/hdfs/DFSInputStream.java | |
| +++ b/src/java/org/apache/hadoop/hdfs/DFSInputStream.java | |
| @@ -713,7 +713,7 @@ public class DFSInputStream extends FSInputStream { | |
| * Close the given BlockReader and cache its socket. | |
| */ | |
| private void closeBlockReader(BlockReader reader) throws IOException { | |
| - if (reader.hasConsumedAll()) { | |
| + if (reader.hasSentStatusCode()) { | |
| Socket oldSock = reader.takeSocket(); | |
| socketCache.put(oldSock); | |
| } | |
| diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java | |
| index 2d9645c..311b724 100644 | |
| --- a/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java | |
| +++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java | |
| @@ -81,7 +81,7 @@ public class TestConnCache { | |
| reader = (BlockReader) invocation.callRealMethod(); | |
| if (sock == null) { | |
| sock = reader.dnSock; | |
| - } else if (prevReader != null && prevReader.hasConsumedAll()) { | |
| + } else if (prevReader != null && prevReader.hasSentStatusCode()) { | |
| // Can't reuse socket if the previous BlockReader didn't read till EOS. | |
| assertSame("DFSInputStream should use the same socket", | |
| sock, reader.dnSock); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment