Created
June 19, 2014 18:24
-
-
Save jjkoshy/4657a44e52e3f88be1c1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/config/log4j.properties b/config/log4j.properties | |
index baa698b..9502254 100644 | |
--- a/config/log4j.properties | |
+++ b/config/log4j.properties | |
@@ -41,7 +41,7 @@ log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n | |
log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender | |
log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH | |
-log4j.appender.cleanerAppender.File=log-cleaner.log | |
+log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log | |
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout | |
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n | |
diff --git a/config/server.properties b/config/server.properties | |
index 9dcb253..e295e84 100644 | |
--- a/config/server.properties | |
+++ b/config/server.properties | |
@@ -102,7 +102,7 @@ log.retention.check.interval.ms=60000 | |
# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. | |
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. | |
-log.cleaner.enable=false | |
+log.cleaner.enable=true | |
#log.cleanup.policy=delete | |
#offsets.load.buffer.size=104857600 | |
#offsets.load.buffer.size=52428800 | |
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala | |
index b2652dd..948696f 100644 | |
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala | |
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala | |
@@ -255,8 +255,8 @@ class FileMessageSet private[kafka](@volatile var file: File, | |
/** | |
* Read from the underlying file into the buffer starting at the given position | |
*/ | |
- def readInto(buffer: ByteBuffer, relativePosition: Int): ByteBuffer = { | |
- channel.read(buffer, relativePosition + this.start) | |
+ def readInto(buffer: ByteBuffer): ByteBuffer = { | |
+ channel.read(buffer, this.start) | |
buffer.flip() | |
buffer | |
} | |
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala | |
index 312204c..340a05d 100644 | |
--- a/core/src/main/scala/kafka/log/LogCleaner.scala | |
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala | |
@@ -213,7 +213,8 @@ class LogCleaner(val config: CleanerConfig, | |
* This class holds the actual logic for cleaning a log | |
* @param id An identifier used for logging | |
* @param offsetMap The map used for deduplication | |
- * @param bufferSize The size of the buffers to use. Memory usage will be 2x this number as there is a read and write buffer. | |
+ * @param ioBufferSize The initial size of the buffers to use. Memory usage will be 2x this number as there is a read and write buffer. | |
+ * @param maxIoBufferSize The maximum size of the buffers to use. (The read and write buffers are allowed to grow from the initial size to this maximum size.) | |
* @param throttler The throttler instance to use for limiting I/O rate. | |
* @param time The time instance | |
*/ | |
@@ -280,7 +281,6 @@ private[log] class Cleaner(val id: Int, | |
* @param log The log being cleaned | |
* @param segments The group of segments being cleaned | |
* @param map The offset map to use for cleaning segments | |
- * @param expectedTruncateCount A count used to check if the log is being truncated and rewritten under our feet | |
* @param deleteHorizonMs The time to retain delete tombstones | |
*/ | |
private[log] def cleanSegments(log: Log, | |
@@ -338,20 +338,21 @@ private[log] class Cleaner(val id: Int, | |
*/ | |
private[log] def cleanInto(topicAndPartition: TopicAndPartition, source: LogSegment, | |
dest: LogSegment, map: OffsetMap, retainDeletes: Boolean) { | |
- var position = 0 | |
- while (position < source.log.sizeInBytes) { | |
+ var currOffset = source.baseOffset | |
+ while (currOffset < source.index.lastOffset) { | |
checkDone(topicAndPartition) | |
// read a chunk of messages and copy any that are to be retained to the write buffer to be written out | |
readBuffer.clear() | |
writeBuffer.clear() | |
- val messages = new ByteBufferMessageSet(source.log.readInto(readBuffer, position)) | |
- throttler.maybeThrottle(messages.sizeInBytes) | |
- // check each message to see if it is to be retained | |
+ | |
+ val messages = source.read(currOffset, None, readBuffer.limit()).asInstanceOf[FileMessageSet] | |
+ messages.readInto(readBuffer) | |
+ val messageSet = new ByteBufferMessageSet(readBuffer) | |
+ throttler.maybeThrottle(messageSet.sizeInBytes) | |
var messagesRead = 0 | |
- for (entry <- messages) { | |
+ for (entry <- messageSet) { // deep iteration | |
messagesRead += 1 | |
val size = MessageSet.entrySize(entry.message) | |
- position += size | |
stats.readMessage(size) | |
val key = entry.message.key | |
require(key != null, "Found null key in log segment %s which is marked as dedupe.".format(source.log.file.getAbsolutePath)) | |
@@ -366,6 +367,7 @@ private[log] class Cleaner(val id: Int, | |
ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset) | |
stats.recopyMessage(size) | |
} | |
+ currOffset = entry.nextOffset | |
} | |
// if any messages are to be retained, write them out | |
if(writeBuffer.position > 0) { | |
@@ -374,10 +376,11 @@ private[log] class Cleaner(val id: Int, | |
dest.append(retained.head.offset, retained) | |
throttler.maybeThrottle(writeBuffer.limit) | |
} | |
- | |
+ | |
// if we read bytes but didn't get even one complete message, our I/O buffer is too small, grow it and try again | |
if(readBuffer.limit > 0 && messagesRead == 0) | |
growBuffers() | |
+ | |
} | |
restoreBuffers() | |
} | |
@@ -473,29 +476,31 @@ private[log] class Cleaner(val id: Int, | |
* @return The final offset covered by the map | |
*/ | |
private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, segment: LogSegment, map: OffsetMap): Long = { | |
- var position = 0 | |
- var offset = segment.baseOffset | |
- while (position < segment.log.sizeInBytes) { | |
+ var currOffset = segment.baseOffset | |
+ var lastSeenOffset = currOffset | |
+ while (currOffset < segment.index.lastOffset) { | |
checkDone(topicAndPartition) | |
readBuffer.clear() | |
- val messages = new ByteBufferMessageSet(segment.log.readInto(readBuffer, position)) | |
- throttler.maybeThrottle(messages.sizeInBytes) | |
- val startPosition = position | |
- for (entry <- messages) { | |
+ val messages = segment.read(currOffset, None, readBuffer.limit()).asInstanceOf[FileMessageSet] | |
+ messages.readInto(readBuffer) | |
+ val messageSet = new ByteBufferMessageSet(readBuffer) | |
+ var messagesRead = 0 | |
+ for (entry <- messageSet) { | |
+ messagesRead += 1 | |
val message = entry.message | |
require(message.hasKey) | |
val size = MessageSet.entrySize(message) | |
- position += size | |
map.put(message.key, entry.offset) | |
- offset = entry.offset | |
+ currOffset = entry.nextOffset | |
+ lastSeenOffset = entry.offset | |
stats.indexMessage(size) | |
} | |
- // if we didn't read even one complete message, our read buffer may be too small | |
- if(position == startPosition) | |
+ if (readBuffer.limit() > 0 && messagesRead == 0) | |
growBuffers() | |
} | |
+ | |
restoreBuffers() | |
- offset | |
+ lastSeenOffset | |
} | |
} | |
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala | |
index 19d423f..3bfa11b 100644 | |
--- a/core/src/main/scala/kafka/server/OffsetManager.scala | |
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala | |
@@ -304,7 +304,7 @@ class OffsetManager(val config: OffsetManagerConfig, | |
while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) { | |
buffer.clear() | |
val messages = log.read(currOffset, config.loadBufferSize).asInstanceOf[FileMessageSet] | |
- messages.readInto(buffer, 0) | |
+ messages.readInto(buffer) | |
val bbset = new ByteBufferMessageSet(buffer) | |
bbset.foreach { msgAndOffset => | |
require(msgAndOffset.message.key != null, "Offset entry key should not be null") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment