Created
October 19, 2011 22:40
-
-
Save rbranson/1299902 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, | |
* software distributed under the License is distributed on an | |
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
* KIND, either express or implied. See the License for the | |
* specific language governing permissions and limitations | |
* under the License. | |
* | |
*/ | |
package org.apache.cassandra.db.commitlog; | |
import java.io.File; | |
import java.io.IOError; | |
import java.io.IOException; | |
import java.io.RandomAccessFile; | |
import java.nio.channels.FileChannel; | |
import java.nio.MappedByteBuffer; | |
import java.nio.ByteBuffer; | |
import java.util.List; | |
import java.util.ArrayList; | |
import java.util.Collections; | |
import java.util.Arrays; | |
import java.util.regex.Matcher; | |
import java.util.regex.Pattern; | |
import java.util.zip.CRC32; | |
import java.util.zip.Checksum; | |
import java.util.concurrent.atomic.AtomicLong; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.ConcurrentHashMap; | |
import org.apache.cassandra.config.CFMetaData; | |
import org.apache.cassandra.config.Schema; | |
import org.apache.cassandra.io.util.SequentialWriter; | |
import org.apache.cassandra.net.MessagingService; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.apache.cassandra.config.DatabaseDescriptor; | |
import org.apache.cassandra.db.RowMutation; | |
import org.apache.cassandra.db.ColumnFamily; | |
/** | |
* A single commit log file on disk. Manages creation of the file and writing row | |
* mutations to disk. This class is thread-safe and supports concurrent invokations | |
* of write() along what should be a non-blocking path. | |
*/ | |
public class CommitLogSegment | |
{ | |
private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class); | |
private static Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile("CommitLog-(\\d+).log"); | |
// The commit log entry overhead in bytes (int: length + long: head checksum + long: tail checksum) | |
private static final int ENTRY_OVERHEAD_SIZE = 4 + 8 + 8; | |
public final long id; | |
private volatile long finalSize = -1; | |
private final File logFile; | |
private final RandomAccessFile logFileAccessor; | |
private final AtomicLong logTailPosition; | |
private final AtomicInteger writerCount; | |
// The first mapped log buffer | |
private final MappedByteBuffer logBuffer; | |
// All of the log buffers, including overflow, so we can sync them | |
private final List<MappedByteBuffer> allLogBuffers; | |
// cache which cf is dirty in this segment to avoid having to lookup all ReplayPositions to decide if we could delete this segment | |
public final ConcurrentHashMap<Integer, Integer> cfLastWrite = new ConcurrentHashMap<Integer, Integer>(); | |
public CommitLogSegment() | |
{ | |
id = System.currentTimeMillis(); | |
String filePath = DatabaseDescriptor.getCommitLogLocation() + File.separator + "CommitLog-" + id + ".log"; | |
logger.info("Creating new commitlog segment " + filePath); | |
writerCount = new AtomicInteger(); | |
allLogBuffers = Collections.synchronizedList(new ArrayList<MappedByteBuffer>()); | |
try | |
{ | |
// Open the initial the segment file | |
logFile = new File(filePath); | |
logFileAccessor = new RandomAccessFile(logFile, "rw"); | |
// Map the segment, extending it to the standard segment size | |
logFileAccessor.setLength(CommitLog.SEGMENT_SIZE); | |
logBuffer = mapLogRegion(logFileAccessor, CommitLog.SEGMENT_SIZE, 0); | |
allLogBuffers.add(logBuffer); | |
// This starts the tail position at byte 0 | |
logTailPosition = new AtomicLong(); | |
} | |
catch (IOException e) | |
{ | |
throw new IOError(e); | |
} | |
} | |
/** | |
* Extracts the commit log ID from @filename | |
*/ | |
public static long idFromFilename(String filename) | |
{ | |
Matcher matcher = COMMIT_LOG_FILE_PATTERN.matcher(filename); | |
try | |
{ | |
if (matcher.matches()) | |
return Long.valueOf(matcher.group(1)); | |
else | |
return -1L; | |
} | |
catch (NumberFormatException e) | |
{ | |
return -1L; | |
} | |
} | |
/** | |
* Returns true if @filename could be a commit log based on it's filename | |
*/ | |
public static boolean possibleCommitLogFile(String filename) | |
{ | |
return COMMIT_LOG_FILE_PATTERN.matcher(filename).matches(); | |
} | |
/** | |
* Appends a row mutation onto the commit log, returning a ReplayPosition | |
* for the row. | |
*/ | |
public ReplayPosition write(RowMutation rowMutation) throws IOException | |
{ | |
if (finalSize >= 0) | |
return null; | |
int writerNum = writerCount.incrementAndGet(); | |
try | |
{ | |
// It really is too bad Thrift's TSerializer will only return a byte[] and won't | |
// serialize directly to a ByteBuffer, otherwise we could do some amazing zero-copy | |
// stuff here. | |
byte[] serializedRow = rowMutation.getSerializedBuffer(MessagingService.version_); | |
long entryLen = ENTRY_OVERHEAD_SIZE + serializedRow.length; | |
ReplayPosition repPos = getAndAdvanceCurrentPosition(entryLen); | |
// Safely get a chunk of the byte buffer, ensuring enough space to store the data | |
ByteBuffer buf = getMappedBufferSafe(repPos.position, entryLen); | |
for (ColumnFamily columnFamily : rowMutation.getColumnFamilies()) | |
{ | |
// check for null cfm in case a cl write goes through after the cf is | |
// defined but before a new segment is created. | |
CFMetaData cfm = Schema.instance.getCFMetaData(columnFamily.id()); | |
if (cfm == null) | |
{ | |
logger.error("Attempted to write commit log entry for unrecognized column family: " + columnFamily.id()); | |
} | |
else | |
{ | |
logger.info("ColumnFamily " + cfm.ksName + "::" + cfm.cfName + " has ID " + cfm.cfId); | |
markCFDirty(cfm.cfId, repPos.position); | |
} | |
} | |
// Write the actual mutation information into the log file with a | |
// checksum on the size up front, as well as a checksum on the data | |
// at the tail. | |
Checksum checksum = new CRC32(); | |
checksum.update(serializedRow.length); | |
buf.putInt(serializedRow.length); | |
buf.putLong(checksum.getValue()); | |
buf.put(serializedRow); | |
checksum.update(serializedRow, 0, serializedRow.length); | |
buf.putLong(checksum.getValue()); | |
return repPos; | |
} | |
finally | |
{ | |
writerCount.decrementAndGet(); | |
} | |
} | |
/** | |
* Forces a disk flush for this segment file. | |
*/ | |
public void sync() throws IOException | |
{ | |
synchronized(allLogBuffers) | |
{ | |
for (MappedByteBuffer buf : allLogBuffers) | |
{ | |
buf.force(); | |
} | |
} | |
} | |
/** | |
* Returns the current ReplayPosition for this log segment | |
*/ | |
public ReplayPosition getCurrentPosition() | |
{ | |
return getReplayPositionAt(logTailPosition.get()); | |
} | |
/** | |
* Returns a ReplayPosition with the tail advanced, to reserve space | |
* for a new log entry. | |
* | |
* @advanceBy The byte length of the reserved area. | |
*/ | |
protected ReplayPosition getAndAdvanceCurrentPosition(long advanceBy) | |
{ | |
return getReplayPositionAt(logTailPosition.getAndAdd(advanceBy)); | |
} | |
private ReplayPosition getReplayPositionAt(long position) | |
{ | |
assert position <= Integer.MAX_VALUE; | |
return new ReplayPosition(id, (int) position); | |
} | |
/** | |
* Returns the file path to this segment | |
*/ | |
public String getPath() | |
{ | |
return logFile.getPath(); | |
} | |
// Is this even used? | |
public String getName() | |
{ | |
String path = getPath(); | |
return path.substring(path.lastIndexOf(File.separator) + 1); | |
} | |
/** | |
* Return the current log file length | |
*/ | |
public long length() | |
{ | |
if (finalSize >= 0) | |
return finalSize; | |
return logFile.length(); | |
} | |
/** | |
* Close the segment, which will wait for all of the writes to complete, and force | |
* a disk flush. | |
*/ | |
public void close() | |
{ | |
if (finalSize >= 0) | |
return; | |
// Effectively blocks writers to begin with | |
finalSize = logFile.length(); | |
// Spin waiting for writers to finish | |
while (writerCount.get() > 0) | |
{ | |
Thread.yield(); | |
} | |
try | |
{ | |
sync(); | |
// Get the real finalSize after we're finished. | |
finalSize = logFile.length(); | |
logFileAccessor.close(); | |
} | |
catch (IOException e) | |
{ | |
throw new IOError(e); | |
} | |
} | |
/** | |
* Thread-safe way to get a correctly positioned buffer for this log segment | |
* with enough space to store @bytesNeeded. | |
*/ | |
private ByteBuffer getMappedBufferSafe(long position, long bytesNeeded) throws IOException | |
{ | |
// FUTURE: The API uses longs to be compatible with JVM 7 support of 64-bit buffers | |
assert position <= Integer.MAX_VALUE; | |
assert bytesNeeded <= Integer.MAX_VALUE; | |
// The reason why we can get away with this is because logBuffer is immutable, | |
// allocated once in the constructor and it's internal state (mark, position, | |
// limit, etc) are never modified. | |
ByteBuffer buf = logBuffer.duplicate(); | |
if (position < buf.limit()) | |
{ | |
// advance the buf to position | |
buf.position((int) position); | |
} | |
else | |
{ | |
// we can't even fit the position within the buf, so it needs to be extended | |
buf = null; | |
} | |
if (buf == null || buf.remaining() < bytesNeeded) | |
{ | |
// if we don't have enough space in the buffer, map a new overflow region at this offset. | |
MappedByteBuffer mbuf = mapLogRegion(logFileAccessor, position, bytesNeeded); | |
synchronized(allLogBuffers) | |
{ | |
allLogBuffers.add(mbuf); | |
} | |
return mbuf; | |
} | |
return buf; | |
} | |
/** | |
* Memory maps a region from file specified by @raf starting at @offset with size of @size bytes | |
*/ | |
private static MappedByteBuffer mapLogRegion(RandomAccessFile raf, long size, long offset) throws IOException | |
{ | |
return raf.getChannel().map(FileChannel.MapMode.READ_WRITE, offset, size); | |
} | |
/** | |
* Considers a ColumnFamily "dirty" by @cfId, and with the latest write was @position. | |
*/ | |
private void markCFDirty(Integer cfId, Integer position) | |
{ | |
// First, try putting the key into the dirty map if it doesn't exist | |
Integer lastWritten = cfLastWrite.putIfAbsent(cfId, position); | |
// If it exists, lastWritten will be the current value for that key, and | |
// we'll keep trying to set it, updating lastWritten, as long as lastWritten | |
// is less than the position requested. | |
while (lastWritten != null && lastWritten < position) | |
{ | |
// The replace is atomic, so if it's been modified since we checked | |
// the value, it won't replace it. | |
if (cfLastWrite.replace(cfId, lastWritten, position)) | |
{ | |
return; | |
} | |
else | |
{ | |
lastWritten = cfLastWrite.get(cfId); | |
} | |
} | |
} | |
/** | |
* Mark the CF clean only if there has been no write since the flush | |
* position was grabbed. | |
*/ | |
public void markCFCleanIfNotWritten(Integer cfId, Integer flushPosition) | |
{ | |
Integer lastWritten = cfLastWrite.get(cfId); | |
// If it does exist, we remove it, and if it's successful, lastWritten will | |
// become null, or at least becomes greater than flushPosition (for instance, | |
// if some other thread wrote to it concurrently) | |
while (lastWritten != null && lastWritten < flushPosition) | |
{ | |
// This is atomic, so cfId in cfLastWrite will only be removed if the | |
// position has stayed the same, otherwise it's still dirty. | |
if (cfLastWrite.remove(cfId, lastWritten)) | |
{ | |
return; | |
} | |
else | |
{ | |
lastWritten = cfLastWrite.get(cfId); | |
Thread.yield(); | |
} | |
} | |
} | |
/** | |
* Marks the ColumnFamily specified by @cfId as clean for this log segment. | |
*/ | |
public void markCFClean(Integer cfId) | |
{ | |
cfLastWrite.remove(cfId); | |
} | |
// For debugging, not fast | |
public String dirtyString() | |
{ | |
StringBuilder sb = new StringBuilder(); | |
for (Integer cfId : cfLastWrite.keySet()) | |
{ | |
CFMetaData m = Schema.instance.getCFMetaData(cfId); | |
sb.append(m == null ? "<deleted>" : m.cfName).append(" (").append(cfId).append("), "); | |
} | |
return sb.toString(); | |
} | |
/** | |
* Returns true if this segment is safe to delete | |
*/ | |
public boolean isSafeToDelete() | |
{ | |
return cfLastWrite.isEmpty(); | |
} | |
@Override | |
public String toString() | |
{ | |
return "CommitLogSegment(" + getPath() + ')'; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment