Created
July 26, 2011 21:47
-
-
Save toddlipcon/1108150 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| commit 207d7caf5dba7f2fe8df905d1d25971b7be700e3 | |
| Author: Todd Lipcon <todd@cloudera.com> | |
| Date: Tue Jul 26 14:45:46 2011 -0700 | |
| refactor edits log buffering to new class | |
| diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java | |
| index 2685e69..5e9d539 100644 | |
| --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java | |
| +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java | |
| @@ -17,18 +17,14 @@ | |
| */ | |
| package org.apache.hadoop.hdfs.server.namenode; | |
| -import java.io.DataOutputStream; | |
| -import java.io.ByteArrayOutputStream; | |
| import java.io.IOException; | |
| import java.net.InetSocketAddress; | |
| -import java.util.ArrayList; | |
| import org.apache.hadoop.hdfs.HdfsConfiguration; | |
| import org.apache.hadoop.hdfs.server.common.Storage; | |
| import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; | |
| import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; | |
| import org.apache.hadoop.io.DataOutputBuffer; | |
| -import org.apache.hadoop.io.Writable; | |
| import org.apache.hadoop.ipc.RPC; | |
| import org.apache.hadoop.net.NetUtils; | |
| @@ -46,21 +42,9 @@ class EditLogBackupOutputStream extends EditLogOutputStream { | |
| private NamenodeProtocol backupNode; // RPC proxy to backup node | |
| private NamenodeRegistration bnRegistration; // backup node registration | |
| private NamenodeRegistration nnRegistration; // active node registration | |
| - private ArrayList<BufferedOp> bufCurrent; // current buffer for writing | |
| - private ArrayList<BufferedOp> bufReady; // buffer ready for flushing | |
| + private EditsDoubleBuffer doubleBuf; | |
| private DataOutputBuffer out; // serialized output sent to backup node | |
| - | |
| - private static class BufferedOp { | |
| - public final FSEditLogOpCodes opCode; | |
| - public final byte[] bytes; | |
| - | |
| - public BufferedOp(FSEditLogOpCodes opCode, byte[] bytes) { | |
| - this.opCode = opCode; | |
| - this.bytes = bytes; | |
| - } | |
| - } | |
| - | |
| EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node | |
| NamenodeRegistration nnReg) // active name-node | |
| throws IOException { | |
| @@ -78,8 +62,7 @@ class EditLogBackupOutputStream extends EditLogOutputStream { | |
| Storage.LOG.error("Error connecting to: " + bnAddress, e); | |
| throw e; | |
| } | |
| - this.bufCurrent = new ArrayList<BufferedOp>(); | |
| - this.bufReady = new ArrayList<BufferedOp>(); | |
| + this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE); | |
| this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE); | |
| } | |
| @@ -95,13 +78,8 @@ class EditLogBackupOutputStream extends EditLogOutputStream { | |
| @Override // EditLogOutputStream | |
| void write(FSEditLogOp op) throws IOException { | |
| - ByteArrayOutputStream baos = new ByteArrayOutputStream(); | |
| - DataOutputStream s = new DataOutputStream(baos); | |
| - FSEditLogOp.Writer w = new FSEditLogOp.Writer(s); | |
| - w.writeOp(op); | |
| - | |
| - bufCurrent.add(new BufferedOp(op.opCode, baos.toByteArray())); | |
| - } | |
| + doubleBuf.writeOp(op); | |
| + } | |
| @Override | |
| void writeRaw(byte[] bytes, int offset, int length) throws IOException { | |
| @@ -113,55 +91,37 @@ class EditLogBackupOutputStream extends EditLogOutputStream { | |
| */ | |
| @Override // EditLogOutputStream | |
| void create() throws IOException { | |
| - bufCurrent.clear(); | |
| - assert bufReady.size() == 0 : "previous data is not flushed yet"; | |
| + assert doubleBuf.isFlushed() : "previous data is not flushed yet"; | |
| + this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE); | |
| } | |
| @Override // EditLogOutputStream | |
| public void close() throws IOException { | |
| // close should have been called after all pending transactions | |
| // have been flushed & synced. | |
| - int size = bufCurrent.size(); | |
| + int size = doubleBuf.countBufferedBytes(); | |
| if (size != 0) { | |
| throw new IOException("BackupEditStream has " + size + | |
| " records still to be flushed and cannot be closed."); | |
| } | |
| RPC.stopProxy(backupNode); // stop the RPC threads | |
| - bufCurrent = bufReady = null; | |
| + doubleBuf.close(); | |
| + doubleBuf = null; | |
| } | |
| @Override // EditLogOutputStream | |
| void setReadyToFlush() throws IOException { | |
| - assert bufReady.size() == 0 : "previous data is not flushed yet"; | |
| - ArrayList<BufferedOp> tmp = bufReady; | |
| - bufReady = bufCurrent; | |
| - bufCurrent = tmp; | |
| + doubleBuf.setReadyToFlush(); | |
| } | |
| @Override // EditLogOutputStream | |
| protected void flushAndSync() throws IOException { | |
| - assert out.size() == 0 : "Output buffer is not empty"; | |
| - int bufReadySize = bufReady.size(); | |
| - for(int idx = 0; idx < bufReadySize; idx++) { | |
| - BufferedOp jRec = null; | |
| - for(; idx < bufReadySize; idx++) { | |
| - jRec = bufReady.get(idx); | |
| - if(jRec.opCode.getOpCode() | |
| - >= FSEditLogOpCodes.OP_JSPOOL_START.getOpCode()) | |
| - break; // special operation should be sent in a separate call to BN | |
| - out.write(jRec.bytes, 0, jRec.bytes.length); | |
| - } | |
| - if(out.size() > 0) | |
| - send(NamenodeProtocol.JA_JOURNAL); | |
| - if(idx == bufReadySize) | |
| - break; | |
| - // operation like start journal spool or increment checkpoint time | |
| - // is a separate call to BN | |
| - out.write(jRec.bytes, 0, jRec.bytes.length); | |
| - send(jRec.opCode.getOpCode()); | |
| + // XXX: this code won't work in trunk, but it's redone | |
| + // in HDFS-1073 where it's simpler. | |
| + doubleBuf.flushTo(out); | |
| + if (out.size() > 0) { | |
| + send(NamenodeProtocol.JA_JOURNAL); | |
| } | |
| - bufReady.clear(); // erase all data in the buffer | |
| - out.reset(); // reset buffer to the start position | |
| } | |
| /** | |
| diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java | |
| index 74f5883..4413001 100644 | |
| --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java | |
| +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java | |
| @@ -24,12 +24,9 @@ import java.io.IOException; | |
| import java.io.RandomAccessFile; | |
| import java.nio.ByteBuffer; | |
| import java.nio.channels.FileChannel; | |
| -import java.util.zip.Checksum; | |
| import org.apache.hadoop.hdfs.protocol.FSConstants; | |
| -import org.apache.hadoop.io.DataOutputBuffer; | |
| import org.apache.hadoop.io.IOUtils; | |
| -import org.apache.hadoop.io.Writable; | |
| import com.google.common.annotations.VisibleForTesting; | |
| @@ -43,10 +40,7 @@ class EditLogFileOutputStream extends EditLogOutputStream { | |
| private File file; | |
| private FileOutputStream fp; // file stream for storing edit logs | |
| private FileChannel fc; // channel of the file stream for sync | |
| - private DataOutputBuffer bufCurrent; // current buffer for writing | |
| - private DataOutputBuffer bufReady; // buffer ready for flushing | |
| - private FSEditLogOp.Writer writer; | |
| - final private int initBufferSize; // inital buffer size | |
| + private EditsDoubleBuffer doubleBuf; | |
| static ByteBuffer fill = ByteBuffer.allocateDirect(1024 * 1024); // preallocation, 1MB | |
| static { | |
| @@ -68,10 +62,7 @@ class EditLogFileOutputStream extends EditLogOutputStream { | |
| EditLogFileOutputStream(File name, int size) throws IOException { | |
| super(); | |
| file = name; | |
| - initBufferSize = size; | |
| - bufCurrent = new DataOutputBuffer(size); | |
| - bufReady = new DataOutputBuffer(size); | |
| - writer = new FSEditLogOp.Writer(bufCurrent); | |
| + doubleBuf = new EditsDoubleBuffer(size); | |
| RandomAccessFile rp = new RandomAccessFile(name, "rw"); | |
| fp = new FileOutputStream(rp.getFD()); // open for append | |
| fc = rp.getChannel(); | |
| @@ -91,23 +82,13 @@ class EditLogFileOutputStream extends EditLogOutputStream { | |
| /** {@inheritDoc} */ | |
| @Override | |
| void write(FSEditLogOp op) throws IOException { | |
| - int start = bufCurrent.getLength(); | |
| - | |
| - writer.writeOp(op); | |
| - | |
| - // write transaction checksum | |
| - int end = bufCurrent.getLength(); | |
| - Checksum checksum = FSEditLog.getChecksum(); | |
| - checksum.reset(); | |
| - checksum.update(bufCurrent.getData(), start, end-start); | |
| - int sum = (int)checksum.getValue(); | |
| - bufCurrent.writeInt(sum); | |
| + doubleBuf.writeOp(op); | |
| } | |
| /** {@inheritDoc} */ | |
| @Override | |
| void writeRaw(byte[] bytes, int offset, int length) throws IOException { | |
| - bufCurrent.write(bytes, offset, length); | |
| + doubleBuf.writeRaw(bytes, offset, length); | |
| } | |
| /** | |
| @@ -117,7 +98,7 @@ class EditLogFileOutputStream extends EditLogOutputStream { | |
| void create() throws IOException { | |
| fc.truncate(0); | |
| fc.position(0); | |
| - bufCurrent.writeInt(FSConstants.LAYOUT_VERSION); | |
| + doubleBuf.getCurrentBuf().writeInt(FSConstants.LAYOUT_VERSION); | |
| setReadyToFlush(); | |
| flush(); | |
| } | |
| @@ -128,23 +109,11 @@ class EditLogFileOutputStream extends EditLogOutputStream { | |
| // close should have been called after all pending transactions | |
| // have been flushed & synced. | |
| // if already closed, just skip | |
| - if(bufCurrent != null) | |
| - { | |
| - int bufSize = bufCurrent.size(); | |
| - if (bufSize != 0) { | |
| - throw new IOException("FSEditStream has " + bufSize | |
| - + " bytes still to be flushed and cannot " + "be closed."); | |
| - } | |
| - bufCurrent.close(); | |
| - bufCurrent = null; | |
| - writer = null; | |
| - } | |
| - | |
| - if(bufReady != null) { | |
| - bufReady.close(); | |
| - bufReady = null; | |
| + if (doubleBuf != null) { | |
| + doubleBuf.close(); | |
| + doubleBuf = null; | |
| } | |
| - | |
| + | |
| // remove the last INVALID marker from transaction log. | |
| if (fc != null && fc.isOpen()) { | |
| fc.truncate(fc.position()); | |
| @@ -156,9 +125,8 @@ class EditLogFileOutputStream extends EditLogOutputStream { | |
| fp = null; | |
| } | |
| } finally { | |
| - IOUtils.cleanup(FSNamesystem.LOG, bufCurrent, bufReady, fc, fp); | |
| - bufCurrent = bufReady = null; | |
| - writer = null; | |
| + IOUtils.cleanup(FSNamesystem.LOG, fc, fp); | |
| + doubleBuf = null; | |
| fc = null; | |
| fp = null; | |
| } | |
| @@ -170,12 +138,8 @@ class EditLogFileOutputStream extends EditLogOutputStream { | |
| */ | |
| @Override | |
| void setReadyToFlush() throws IOException { | |
| - assert bufReady.size() == 0 : "previous data is not flushed yet"; | |
| - bufCurrent.write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker | |
| - DataOutputBuffer tmp = bufReady; | |
| - bufReady = bufCurrent; | |
| - bufCurrent = tmp; | |
| - writer = new FSEditLogOp.Writer(bufCurrent); | |
| + doubleBuf.getCurrentBuf().write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker | |
| + doubleBuf.setReadyToFlush(); | |
| } | |
| /** | |
| @@ -185,8 +149,7 @@ class EditLogFileOutputStream extends EditLogOutputStream { | |
| @Override | |
| protected void flushAndSync() throws IOException { | |
| preallocate(); // preallocate file if necessary | |
| - bufReady.writeTo(fp); // write data to file | |
| - bufReady.reset(); // erase all data in the buffer | |
| + doubleBuf.flushTo(fp); | |
| fc.force(false); // metadata updates not needed because of preallocation | |
| fc.position(fc.position() - 1); // skip back the end-of-file marker | |
| } | |
| @@ -196,7 +159,7 @@ class EditLogFileOutputStream extends EditLogOutputStream { | |
| */ | |
| @Override | |
| public boolean shouldForceSync() { | |
| - return bufReady.size() >= initBufferSize; | |
| + return doubleBuf.shouldForceSync(); | |
| } | |
| /** | |
| @@ -205,8 +168,8 @@ class EditLogFileOutputStream extends EditLogOutputStream { | |
| @Override | |
| long length() throws IOException { | |
| // file size - header size + size of both buffers | |
| - return fc.size() - EDITS_FILE_HEADER_SIZE_BYTES + bufReady.size() | |
| - + bufCurrent.size(); | |
| + return fc.size() - EDITS_FILE_HEADER_SIZE_BYTES + | |
| + doubleBuf.countBufferedBytes(); | |
| } | |
| // allocate a big chunk of data | |
| diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java | |
| new file mode 100644 | |
| index 0000000..b5e0ef8 | |
| --- /dev/null | |
| +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java | |
| @@ -0,0 +1,77 @@ | |
| +package org.apache.hadoop.hdfs.server.namenode; | |
| + | |
| +import java.io.IOException; | |
| +import java.io.OutputStream; | |
| + | |
| +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer; | |
| +import org.apache.hadoop.io.DataOutputBuffer; | |
| +import org.apache.hadoop.io.IOUtils; | |
| + | |
| +import com.google.common.base.Preconditions; | |
| + | |
| +class EditsDoubleBuffer { | |
| + | |
| + private DataOutputBuffer bufCurrent; // current buffer for writing | |
| + private DataOutputBuffer bufReady; // buffer ready for flushing | |
| + private final int initBufferSize; | |
| + private Writer writer; | |
| + | |
| + public EditsDoubleBuffer(int defaultBufferSize) { | |
| + initBufferSize = defaultBufferSize; | |
| + bufCurrent = new DataOutputBuffer(initBufferSize); | |
| + bufReady = new DataOutputBuffer(initBufferSize); | |
| + writer = new FSEditLogOp.Writer(bufCurrent); | |
| + } | |
| + | |
| + public void writeOp(FSEditLogOp op) throws IOException { | |
| + writer.writeOp(op); | |
| + } | |
| + | |
| + void writeRaw(byte[] bytes, int offset, int length) throws IOException { | |
| + bufCurrent.write(bytes, offset, length); | |
| + } | |
| + | |
| + void close() throws IOException { | |
| + Preconditions.checkNotNull(bufCurrent); | |
| + Preconditions.checkNotNull(bufReady); | |
| + | |
| + int bufSize = bufCurrent.size(); | |
| + if (bufSize != 0) { | |
| + throw new IOException("FSEditStream has " + bufSize | |
| + + " bytes still to be flushed and cannot " + "be closed."); | |
| + } | |
| + | |
| + IOUtils.cleanup(null, bufCurrent, bufReady); | |
| + bufCurrent = bufReady = null; | |
| + } | |
| + | |
| + void setReadyToFlush() { | |
| + assert isFlushed() : "previous data not flushed yet"; | |
| + DataOutputBuffer tmp = bufReady; | |
| + bufReady = bufCurrent; | |
| + bufCurrent = tmp; | |
| + writer = new FSEditLogOp.Writer(bufCurrent); | |
| + } | |
| + | |
| + void flushTo(OutputStream out) throws IOException { | |
| + bufReady.writeTo(out); // write data to file | |
| + bufReady.reset(); // erase all data in the buffer | |
| + } | |
| + | |
| + boolean shouldForceSync() { | |
| + return bufReady.size() >= initBufferSize; | |
| + } | |
| + | |
| + DataOutputBuffer getCurrentBuf() { | |
| + return bufCurrent; | |
| + } | |
| + | |
| + public boolean isFlushed() { | |
| + return bufReady.size() == 0; | |
| + } | |
| + | |
| + public int countBufferedBytes() { | |
| + return bufReady.size() + bufCurrent.size(); | |
| + } | |
| + | |
| +} | |
| diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java | |
| index b50f19d..fb6e65f 100644 | |
| --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java | |
| +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java | |
| @@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; | |
| import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*; | |
| import org.apache.hadoop.security.token.delegation.DelegationKey; | |
| import org.apache.hadoop.io.BytesWritable; | |
| +import org.apache.hadoop.io.DataOutputBuffer; | |
| import org.apache.hadoop.io.LongWritable; | |
| import org.apache.hadoop.io.ArrayWritable; | |
| import org.apache.hadoop.io.Writable; | |
| @@ -1341,10 +1342,10 @@ public abstract class FSEditLogOp { | |
| * Class for writing editlog ops | |
| */ | |
| public static class Writer { | |
| - private final DataOutputStream out; | |
| + private final DataOutputBuffer buf; | |
| - public Writer(DataOutputStream out) { | |
| - this.out = out; | |
| + public Writer(DataOutputBuffer out) { | |
| + this.buf = out; | |
| } | |
| /** | |
| @@ -1354,9 +1355,15 @@ public abstract class FSEditLogOp { | |
| * @throws IOException if an error occurs during writing. | |
| */ | |
| public void writeOp(FSEditLogOp op) throws IOException { | |
| - out.writeByte(op.opCode.getOpCode()); | |
| - | |
| - op.writeFields(out); | |
| + int start = buf.getLength(); | |
| + buf.writeByte(op.opCode.getOpCode()); | |
| + op.writeFields(buf); | |
| + int end = buf.getLength(); | |
| + Checksum checksum = FSEditLog.getChecksum(); | |
| + checksum.reset(); | |
| + checksum.update(buf.getData(), start, end-start); | |
| + int sum = (int)checksum.getValue(); | |
| + buf.writeInt(sum); | |
| } | |
| } | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment