Skip to content

Instantly share code, notes, and snippets.

@rbranson
Created October 19, 2011 22:40
Show Gist options
  • Save rbranson/1299902 to your computer and use it in GitHub Desktop.
Save rbranson/1299902 to your computer and use it in GitHub Desktop.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.cassandra.db.commitlog;
import java.io.File;
import java.io.IOError;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.MappedByteBuffer;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.net.MessagingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.ColumnFamily;
/**
* A single commit log file on disk. Manages creation of the file and writing row
* mutations to disk. This class is thread-safe and supports concurrent invokations
* of write() along what should be a non-blocking path.
*/
public class CommitLogSegment
{
private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class);
private static Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile("CommitLog-(\\d+).log");
// The commit log entry overhead in bytes (int: length + long: head checksum + long: tail checksum)
private static final int ENTRY_OVERHEAD_SIZE = 4 + 8 + 8;
public final long id;
private volatile long finalSize = -1;
private final File logFile;
private final RandomAccessFile logFileAccessor;
private final AtomicLong logTailPosition;
private final AtomicInteger writerCount;
// The first mapped log buffer
private final MappedByteBuffer logBuffer;
// All of the log buffers, including overflow, so we can sync them
private final List<MappedByteBuffer> allLogBuffers;
// cache which cf is dirty in this segment to avoid having to lookup all ReplayPositions to decide if we could delete this segment
public final ConcurrentHashMap<Integer, Integer> cfLastWrite = new ConcurrentHashMap<Integer, Integer>();
public CommitLogSegment()
{
id = System.currentTimeMillis();
String filePath = DatabaseDescriptor.getCommitLogLocation() + File.separator + "CommitLog-" + id + ".log";
logger.info("Creating new commitlog segment " + filePath);
writerCount = new AtomicInteger();
allLogBuffers = Collections.synchronizedList(new ArrayList<MappedByteBuffer>());
try
{
// Open the initial the segment file
logFile = new File(filePath);
logFileAccessor = new RandomAccessFile(logFile, "rw");
// Map the segment, extending it to the standard segment size
logFileAccessor.setLength(CommitLog.SEGMENT_SIZE);
logBuffer = mapLogRegion(logFileAccessor, CommitLog.SEGMENT_SIZE, 0);
allLogBuffers.add(logBuffer);
// This starts the tail position at byte 0
logTailPosition = new AtomicLong();
}
catch (IOException e)
{
throw new IOError(e);
}
}
/**
* Extracts the commit log ID from @filename
*/
public static long idFromFilename(String filename)
{
Matcher matcher = COMMIT_LOG_FILE_PATTERN.matcher(filename);
try
{
if (matcher.matches())
return Long.valueOf(matcher.group(1));
else
return -1L;
}
catch (NumberFormatException e)
{
return -1L;
}
}
/**
* Returns true if @filename could be a commit log based on it's filename
*/
public static boolean possibleCommitLogFile(String filename)
{
return COMMIT_LOG_FILE_PATTERN.matcher(filename).matches();
}
/**
* Appends a row mutation onto the commit log, returning a ReplayPosition
* for the row.
*/
public ReplayPosition write(RowMutation rowMutation) throws IOException
{
if (finalSize >= 0)
return null;
int writerNum = writerCount.incrementAndGet();
try
{
// It really is too bad Thrift's TSerializer will only return a byte[] and won't
// serialize directly to a ByteBuffer, otherwise we could do some amazing zero-copy
// stuff here.
byte[] serializedRow = rowMutation.getSerializedBuffer(MessagingService.version_);
long entryLen = ENTRY_OVERHEAD_SIZE + serializedRow.length;
ReplayPosition repPos = getAndAdvanceCurrentPosition(entryLen);
// Safely get a chunk of the byte buffer, ensuring enough space to store the data
ByteBuffer buf = getMappedBufferSafe(repPos.position, entryLen);
for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
{
// check for null cfm in case a cl write goes through after the cf is
// defined but before a new segment is created.
CFMetaData cfm = Schema.instance.getCFMetaData(columnFamily.id());
if (cfm == null)
{
logger.error("Attempted to write commit log entry for unrecognized column family: " + columnFamily.id());
}
else
{
logger.info("ColumnFamily " + cfm.ksName + "::" + cfm.cfName + " has ID " + cfm.cfId);
markCFDirty(cfm.cfId, repPos.position);
}
}
// Write the actual mutation information into the log file with a
// checksum on the size up front, as well as a checksum on the data
// at the tail.
Checksum checksum = new CRC32();
checksum.update(serializedRow.length);
buf.putInt(serializedRow.length);
buf.putLong(checksum.getValue());
buf.put(serializedRow);
checksum.update(serializedRow, 0, serializedRow.length);
buf.putLong(checksum.getValue());
return repPos;
}
finally
{
writerCount.decrementAndGet();
}
}
/**
* Forces a disk flush for this segment file.
*/
public void sync() throws IOException
{
synchronized(allLogBuffers)
{
for (MappedByteBuffer buf : allLogBuffers)
{
buf.force();
}
}
}
/**
* Returns the current ReplayPosition for this log segment
*/
public ReplayPosition getCurrentPosition()
{
return getReplayPositionAt(logTailPosition.get());
}
/**
* Returns a ReplayPosition with the tail advanced, to reserve space
* for a new log entry.
*
* @advanceBy The byte length of the reserved area.
*/
protected ReplayPosition getAndAdvanceCurrentPosition(long advanceBy)
{
return getReplayPositionAt(logTailPosition.getAndAdd(advanceBy));
}
private ReplayPosition getReplayPositionAt(long position)
{
assert position <= Integer.MAX_VALUE;
return new ReplayPosition(id, (int) position);
}
/**
* Returns the file path to this segment
*/
public String getPath()
{
return logFile.getPath();
}
// Is this even used?
public String getName()
{
String path = getPath();
return path.substring(path.lastIndexOf(File.separator) + 1);
}
/**
* Return the current log file length
*/
public long length()
{
if (finalSize >= 0)
return finalSize;
return logFile.length();
}
/**
* Close the segment, which will wait for all of the writes to complete, and force
* a disk flush.
*/
public void close()
{
if (finalSize >= 0)
return;
// Effectively blocks writers to begin with
finalSize = logFile.length();
// Spin waiting for writers to finish
while (writerCount.get() > 0)
{
Thread.yield();
}
try
{
sync();
// Get the real finalSize after we're finished.
finalSize = logFile.length();
logFileAccessor.close();
}
catch (IOException e)
{
throw new IOError(e);
}
}
/**
* Thread-safe way to get a correctly positioned buffer for this log segment
* with enough space to store @bytesNeeded.
*/
private ByteBuffer getMappedBufferSafe(long position, long bytesNeeded) throws IOException
{
// FUTURE: The API uses longs to be compatible with JVM 7 support of 64-bit buffers
assert position <= Integer.MAX_VALUE;
assert bytesNeeded <= Integer.MAX_VALUE;
// The reason why we can get away with this is because logBuffer is immutable,
// allocated once in the constructor and it's internal state (mark, position,
// limit, etc) are never modified.
ByteBuffer buf = logBuffer.duplicate();
if (position < buf.limit())
{
// advance the buf to position
buf.position((int) position);
}
else
{
// we can't even fit the position within the buf, so it needs to be extended
buf = null;
}
if (buf == null || buf.remaining() < bytesNeeded)
{
// if we don't have enough space in the buffer, map a new overflow region at this offset.
MappedByteBuffer mbuf = mapLogRegion(logFileAccessor, position, bytesNeeded);
synchronized(allLogBuffers)
{
allLogBuffers.add(mbuf);
}
return mbuf;
}
return buf;
}
/**
* Memory maps a region from file specified by @raf starting at @offset with size of @size bytes
*/
private static MappedByteBuffer mapLogRegion(RandomAccessFile raf, long size, long offset) throws IOException
{
return raf.getChannel().map(FileChannel.MapMode.READ_WRITE, offset, size);
}
/**
* Considers a ColumnFamily "dirty" by @cfId, and with the latest write was @position.
*/
private void markCFDirty(Integer cfId, Integer position)
{
// First, try putting the key into the dirty map if it doesn't exist
Integer lastWritten = cfLastWrite.putIfAbsent(cfId, position);
// If it exists, lastWritten will be the current value for that key, and
// we'll keep trying to set it, updating lastWritten, as long as lastWritten
// is less than the position requested.
while (lastWritten != null && lastWritten < position)
{
// The replace is atomic, so if it's been modified since we checked
// the value, it won't replace it.
if (cfLastWrite.replace(cfId, lastWritten, position))
{
return;
}
else
{
lastWritten = cfLastWrite.get(cfId);
}
}
}
/**
* Mark the CF clean only if there has been no write since the flush
* position was grabbed.
*/
public void markCFCleanIfNotWritten(Integer cfId, Integer flushPosition)
{
Integer lastWritten = cfLastWrite.get(cfId);
// If it does exist, we remove it, and if it's successful, lastWritten will
// become null, or at least becomes greater than flushPosition (for instance,
// if some other thread wrote to it concurrently)
while (lastWritten != null && lastWritten < flushPosition)
{
// This is atomic, so cfId in cfLastWrite will only be removed if the
// position has stayed the same, otherwise it's still dirty.
if (cfLastWrite.remove(cfId, lastWritten))
{
return;
}
else
{
lastWritten = cfLastWrite.get(cfId);
Thread.yield();
}
}
}
/**
* Marks the ColumnFamily specified by @cfId as clean for this log segment.
*/
public void markCFClean(Integer cfId)
{
cfLastWrite.remove(cfId);
}
// For debugging, not fast
public String dirtyString()
{
StringBuilder sb = new StringBuilder();
for (Integer cfId : cfLastWrite.keySet())
{
CFMetaData m = Schema.instance.getCFMetaData(cfId);
sb.append(m == null ? "<deleted>" : m.cfName).append(" (").append(cfId).append("), ");
}
return sb.toString();
}
/**
* Returns true if this segment is safe to delete
*/
public boolean isSafeToDelete()
{
return cfLastWrite.isEmpty();
}
@Override
public String toString()
{
return "CommitLogSegment(" + getPath() + ')';
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment