Skip to content

Instantly share code, notes, and snippets.

@toddlipcon
Created July 26, 2011 21:47
Show Gist options
  • Select an option

  • Save toddlipcon/1108150 to your computer and use it in GitHub Desktop.

Select an option

Save toddlipcon/1108150 to your computer and use it in GitHub Desktop.
commit 207d7caf5dba7f2fe8df905d1d25971b7be700e3
Author: Todd Lipcon <todd@cloudera.com>
Date: Tue Jul 26 14:45:46 2011 -0700
refactor edits log buffering to new class
diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
index 2685e69..5e9d539 100644
--- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
+++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
@@ -17,18 +17,14 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
-import java.io.DataOutputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
@@ -46,21 +42,9 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
private NamenodeProtocol backupNode; // RPC proxy to backup node
private NamenodeRegistration bnRegistration; // backup node registration
private NamenodeRegistration nnRegistration; // active node registration
- private ArrayList<BufferedOp> bufCurrent; // current buffer for writing
- private ArrayList<BufferedOp> bufReady; // buffer ready for flushing
+ private EditsDoubleBuffer doubleBuf;
private DataOutputBuffer out; // serialized output sent to backup node
-
- private static class BufferedOp {
- public final FSEditLogOpCodes opCode;
- public final byte[] bytes;
-
- public BufferedOp(FSEditLogOpCodes opCode, byte[] bytes) {
- this.opCode = opCode;
- this.bytes = bytes;
- }
- }
-
EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node
NamenodeRegistration nnReg) // active name-node
throws IOException {
@@ -78,8 +62,7 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
Storage.LOG.error("Error connecting to: " + bnAddress, e);
throw e;
}
- this.bufCurrent = new ArrayList<BufferedOp>();
- this.bufReady = new ArrayList<BufferedOp>();
+ this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE);
this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
}
@@ -95,13 +78,8 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
@Override // EditLogOutputStream
void write(FSEditLogOp op) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream s = new DataOutputStream(baos);
- FSEditLogOp.Writer w = new FSEditLogOp.Writer(s);
- w.writeOp(op);
-
- bufCurrent.add(new BufferedOp(op.opCode, baos.toByteArray()));
- }
+ doubleBuf.writeOp(op);
+ }
@Override
void writeRaw(byte[] bytes, int offset, int length) throws IOException {
@@ -113,55 +91,37 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
*/
@Override // EditLogOutputStream
void create() throws IOException {
- bufCurrent.clear();
- assert bufReady.size() == 0 : "previous data is not flushed yet";
+ assert doubleBuf.isFlushed() : "previous data is not flushed yet";
+ this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE);
}
@Override // EditLogOutputStream
public void close() throws IOException {
// close should have been called after all pending transactions
// have been flushed & synced.
- int size = bufCurrent.size();
+ int size = doubleBuf.countBufferedBytes();
if (size != 0) {
throw new IOException("BackupEditStream has " + size +
" records still to be flushed and cannot be closed.");
}
RPC.stopProxy(backupNode); // stop the RPC threads
- bufCurrent = bufReady = null;
+ doubleBuf.close();
+ doubleBuf = null;
}
@Override // EditLogOutputStream
void setReadyToFlush() throws IOException {
- assert bufReady.size() == 0 : "previous data is not flushed yet";
- ArrayList<BufferedOp> tmp = bufReady;
- bufReady = bufCurrent;
- bufCurrent = tmp;
+ doubleBuf.setReadyToFlush();
}
@Override // EditLogOutputStream
protected void flushAndSync() throws IOException {
- assert out.size() == 0 : "Output buffer is not empty";
- int bufReadySize = bufReady.size();
- for(int idx = 0; idx < bufReadySize; idx++) {
- BufferedOp jRec = null;
- for(; idx < bufReadySize; idx++) {
- jRec = bufReady.get(idx);
- if(jRec.opCode.getOpCode()
- >= FSEditLogOpCodes.OP_JSPOOL_START.getOpCode())
- break; // special operation should be sent in a separate call to BN
- out.write(jRec.bytes, 0, jRec.bytes.length);
- }
- if(out.size() > 0)
- send(NamenodeProtocol.JA_JOURNAL);
- if(idx == bufReadySize)
- break;
- // operation like start journal spool or increment checkpoint time
- // is a separate call to BN
- out.write(jRec.bytes, 0, jRec.bytes.length);
- send(jRec.opCode.getOpCode());
+ // XXX: this code won't work in trunk, but it's redone
+ // in HDFS-1073 where it's simpler.
+ doubleBuf.flushTo(out);
+ if (out.size() > 0) {
+ send(NamenodeProtocol.JA_JOURNAL);
}
- bufReady.clear(); // erase all data in the buffer
- out.reset(); // reset buffer to the start position
}
/**
diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
index 74f5883..4413001 100644
--- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
+++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
@@ -24,12 +24,9 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import java.util.zip.Checksum;
import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Writable;
import com.google.common.annotations.VisibleForTesting;
@@ -43,10 +40,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
private File file;
private FileOutputStream fp; // file stream for storing edit logs
private FileChannel fc; // channel of the file stream for sync
- private DataOutputBuffer bufCurrent; // current buffer for writing
- private DataOutputBuffer bufReady; // buffer ready for flushing
- private FSEditLogOp.Writer writer;
- final private int initBufferSize; // inital buffer size
+ private EditsDoubleBuffer doubleBuf;
static ByteBuffer fill = ByteBuffer.allocateDirect(1024 * 1024); // preallocation, 1MB
static {
@@ -68,10 +62,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
EditLogFileOutputStream(File name, int size) throws IOException {
super();
file = name;
- initBufferSize = size;
- bufCurrent = new DataOutputBuffer(size);
- bufReady = new DataOutputBuffer(size);
- writer = new FSEditLogOp.Writer(bufCurrent);
+ doubleBuf = new EditsDoubleBuffer(size);
RandomAccessFile rp = new RandomAccessFile(name, "rw");
fp = new FileOutputStream(rp.getFD()); // open for append
fc = rp.getChannel();
@@ -91,23 +82,13 @@ class EditLogFileOutputStream extends EditLogOutputStream {
/** {@inheritDoc} */
@Override
void write(FSEditLogOp op) throws IOException {
- int start = bufCurrent.getLength();
-
- writer.writeOp(op);
-
- // write transaction checksum
- int end = bufCurrent.getLength();
- Checksum checksum = FSEditLog.getChecksum();
- checksum.reset();
- checksum.update(bufCurrent.getData(), start, end-start);
- int sum = (int)checksum.getValue();
- bufCurrent.writeInt(sum);
+ doubleBuf.writeOp(op);
}
/** {@inheritDoc} */
@Override
void writeRaw(byte[] bytes, int offset, int length) throws IOException {
- bufCurrent.write(bytes, offset, length);
+ doubleBuf.writeRaw(bytes, offset, length);
}
/**
@@ -117,7 +98,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
void create() throws IOException {
fc.truncate(0);
fc.position(0);
- bufCurrent.writeInt(FSConstants.LAYOUT_VERSION);
+ doubleBuf.getCurrentBuf().writeInt(FSConstants.LAYOUT_VERSION);
setReadyToFlush();
flush();
}
@@ -128,23 +109,11 @@ class EditLogFileOutputStream extends EditLogOutputStream {
// close should have been called after all pending transactions
// have been flushed & synced.
// if already closed, just skip
- if(bufCurrent != null)
- {
- int bufSize = bufCurrent.size();
- if (bufSize != 0) {
- throw new IOException("FSEditStream has " + bufSize
- + " bytes still to be flushed and cannot " + "be closed.");
- }
- bufCurrent.close();
- bufCurrent = null;
- writer = null;
- }
-
- if(bufReady != null) {
- bufReady.close();
- bufReady = null;
+ if (doubleBuf != null) {
+ doubleBuf.close();
+ doubleBuf = null;
}
-
+
// remove the last INVALID marker from transaction log.
if (fc != null && fc.isOpen()) {
fc.truncate(fc.position());
@@ -156,9 +125,8 @@ class EditLogFileOutputStream extends EditLogOutputStream {
fp = null;
}
} finally {
- IOUtils.cleanup(FSNamesystem.LOG, bufCurrent, bufReady, fc, fp);
- bufCurrent = bufReady = null;
- writer = null;
+ IOUtils.cleanup(FSNamesystem.LOG, fc, fp);
+ doubleBuf = null;
fc = null;
fp = null;
}
@@ -170,12 +138,8 @@ class EditLogFileOutputStream extends EditLogOutputStream {
*/
@Override
void setReadyToFlush() throws IOException {
- assert bufReady.size() == 0 : "previous data is not flushed yet";
- bufCurrent.write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker
- DataOutputBuffer tmp = bufReady;
- bufReady = bufCurrent;
- bufCurrent = tmp;
- writer = new FSEditLogOp.Writer(bufCurrent);
+ doubleBuf.getCurrentBuf().write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker
+ doubleBuf.setReadyToFlush();
}
/**
@@ -185,8 +149,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
@Override
protected void flushAndSync() throws IOException {
preallocate(); // preallocate file if necessary
- bufReady.writeTo(fp); // write data to file
- bufReady.reset(); // erase all data in the buffer
+ doubleBuf.flushTo(fp);
fc.force(false); // metadata updates not needed because of preallocation
fc.position(fc.position() - 1); // skip back the end-of-file marker
}
@@ -196,7 +159,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
*/
@Override
public boolean shouldForceSync() {
- return bufReady.size() >= initBufferSize;
+ return doubleBuf.shouldForceSync();
}
/**
@@ -205,8 +168,8 @@ class EditLogFileOutputStream extends EditLogOutputStream {
@Override
long length() throws IOException {
// file size - header size + size of both buffers
- return fc.size() - EDITS_FILE_HEADER_SIZE_BYTES + bufReady.size()
- + bufCurrent.size();
+ return fc.size() - EDITS_FILE_HEADER_SIZE_BYTES +
+ doubleBuf.countBufferedBytes();
}
// allocate a big chunk of data
diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java
new file mode 100644
index 0000000..b5e0ef8
--- /dev/null
+++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java
@@ -0,0 +1,77 @@
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+
+import com.google.common.base.Preconditions;
+
+class EditsDoubleBuffer {
+
+ private DataOutputBuffer bufCurrent; // current buffer for writing
+ private DataOutputBuffer bufReady; // buffer ready for flushing
+ private final int initBufferSize;
+ private Writer writer;
+
+ public EditsDoubleBuffer(int defaultBufferSize) {
+ initBufferSize = defaultBufferSize;
+ bufCurrent = new DataOutputBuffer(initBufferSize);
+ bufReady = new DataOutputBuffer(initBufferSize);
+ writer = new FSEditLogOp.Writer(bufCurrent);
+ }
+
+ public void writeOp(FSEditLogOp op) throws IOException {
+ writer.writeOp(op);
+ }
+
+ void writeRaw(byte[] bytes, int offset, int length) throws IOException {
+ bufCurrent.write(bytes, offset, length);
+ }
+
+ void close() throws IOException {
+ Preconditions.checkNotNull(bufCurrent);
+ Preconditions.checkNotNull(bufReady);
+
+ int bufSize = bufCurrent.size();
+ if (bufSize != 0) {
+ throw new IOException("FSEditStream has " + bufSize
+ + " bytes still to be flushed and cannot " + "be closed.");
+ }
+
+ IOUtils.cleanup(null, bufCurrent, bufReady);
+ bufCurrent = bufReady = null;
+ }
+
+ void setReadyToFlush() {
+ assert isFlushed() : "previous data not flushed yet";
+ DataOutputBuffer tmp = bufReady;
+ bufReady = bufCurrent;
+ bufCurrent = tmp;
+ writer = new FSEditLogOp.Writer(bufCurrent);
+ }
+
+ void flushTo(OutputStream out) throws IOException {
+ bufReady.writeTo(out); // write data to file
+ bufReady.reset(); // erase all data in the buffer
+ }
+
+ boolean shouldForceSync() {
+ return bufReady.size() >= initBufferSize;
+ }
+
+ DataOutputBuffer getCurrentBuf() {
+ return bufCurrent;
+ }
+
+ public boolean isFlushed() {
+ return bufReady.size() == 0;
+ }
+
+ public int countBufferedBytes() {
+ return bufReady.size() + bufCurrent.size();
+ }
+
+}
diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
index b50f19d..fb6e65f 100644
--- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
+++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.Writable;
@@ -1341,10 +1342,10 @@ public abstract class FSEditLogOp {
* Class for writing editlog ops
*/
public static class Writer {
- private final DataOutputStream out;
+ private final DataOutputBuffer buf;
- public Writer(DataOutputStream out) {
- this.out = out;
+ public Writer(DataOutputBuffer out) {
+ this.buf = out;
}
/**
@@ -1354,9 +1355,15 @@ public abstract class FSEditLogOp {
* @throws IOException if an error occurs during writing.
*/
public void writeOp(FSEditLogOp op) throws IOException {
- out.writeByte(op.opCode.getOpCode());
-
- op.writeFields(out);
+ int start = buf.getLength();
+ buf.writeByte(op.opCode.getOpCode());
+ op.writeFields(buf);
+ int end = buf.getLength();
+ Checksum checksum = FSEditLog.getChecksum();
+ checksum.reset();
+ checksum.update(buf.getData(), start, end-start);
+ int sum = (int)checksum.getValue();
+ buf.writeInt(sum);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment