Created
April 29, 2015 22:17
-
-
Save gomathi/0d63e29385017577ce3a to your computer and use it in GitHub Desktop.
KafkaOffsetManager
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.yahoo.mail.force.asyncf.consumer; | |
import java.net.InetSocketAddress; | |
import java.util.Arrays; | |
import java.util.LinkedHashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.ConcurrentMap; | |
import java.util.concurrent.ConcurrentSkipListMap; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.TimeUnit; | |
import kafka.api.ConsumerMetadataRequest; | |
import kafka.cluster.Broker; | |
import kafka.common.ErrorMapping; | |
import kafka.common.OffsetAndMetadata; | |
import kafka.common.TopicAndPartition; | |
import kafka.javaapi.ConsumerMetadataResponse; | |
import kafka.javaapi.OffsetCommitRequest; | |
import kafka.javaapi.OffsetCommitResponse; | |
import kafka.network.BlockingChannel; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.apache.kafka.common.utils.ClientUtils; | |
import com.google.common.base.Preconditions; | |
/** | |
* An offset manager which updates kafka partitions' offset in background. | |
* | |
*/ | |
public class OffsetManager { | |
private final static Log LOG = LogFactory.getLog(OffsetManager.class); | |
private final ConcurrentMap<Integer, ConcurrentSkipListMap<Long, JobStatus>> partitionsAndOffsets = new ConcurrentHashMap<>(); | |
private final ScheduledExecutorService scheduledExecutorService = Executors | |
.newScheduledThreadPool(1); | |
private final static String CLIENT_ID = "test"; | |
private final static int NO_OF_RETRIES = 5; | |
private final long delay; | |
private final TimeUnit unit; | |
private final String topicName; | |
private final List<InetSocketAddress> brokerNames; | |
private final String consumerName; | |
public OffsetManager( | |
String brokerNames, String topicName, String consumerName, | |
long delay, TimeUnit unit) { | |
Preconditions.checkNotNull(brokerNames); | |
this.delay = delay; | |
this.unit = unit; | |
this.brokerNames = ClientUtils | |
.parseAndValidateAddresses(getUrls(brokerNames)); | |
Preconditions.checkArgument(this.brokerNames.size() > 0); | |
this.topicName = topicName; | |
this.consumerName = consumerName; | |
} | |
private static List<String> getUrls(String brokerNames) { | |
String[] brokers = brokerNames.split(","); | |
return Arrays.asList(brokers); | |
} | |
public void startOffsetUpdates() { | |
scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { | |
@Override | |
public void run() { | |
try { | |
commitOffsetsWithBroker(); | |
} | |
catch (InterruptedException e) { | |
LOG.error("Exception occurred while updating offset.", e); | |
} | |
} | |
}, 0, delay, unit); | |
} | |
public void stopOffsetUpdates() { | |
scheduledExecutorService.shutdown(); | |
} | |
public void addOffsetForPartition(int partitionId, long offset) { | |
if (!partitionsAndOffsets.containsKey(partitionId)) | |
this.partitionsAndOffsets.putIfAbsent(partitionId, | |
new ConcurrentSkipListMap<>()); | |
this.partitionsAndOffsets.get(partitionId).put(offset, | |
JobStatus.RUNNING); | |
} | |
public void updateOffsetForPartition(int partitionId, long offset) { | |
if (partitionsAndOffsets.containsKey(partitionId)) | |
this.partitionsAndOffsets.putIfAbsent(partitionId, | |
new ConcurrentSkipListMap<>()); | |
this.partitionsAndOffsets.get(partitionId).put(offset, | |
JobStatus.FINISHED); | |
} | |
private void commitOffsetsWithBroker() throws InterruptedException { | |
LOG.info("Updating offset - starting."); | |
for (Map.Entry<Integer, ConcurrentSkipListMap<Long, JobStatus>> entry : partitionsAndOffsets | |
.entrySet()) { | |
int partitionId = entry.getKey(); | |
ConcurrentSkipListMap<Long, JobStatus> sMap = entry.getValue(); | |
Long offset = null; | |
for (Map.Entry<Long, JobStatus> offsets : sMap.entrySet()) { | |
if (offsets.getValue() == JobStatus.FINISHED) { | |
offset = offsets.getKey(); | |
sMap.remove(offsets.getKey()); | |
} | |
else | |
break; | |
} | |
if (offset != null) { | |
long sleepTime = 1000; | |
int brokerIndex = 0; | |
for (int i = 1; i <= NO_OF_RETRIES; i++) { | |
boolean successful = commitOffset( | |
brokerNames.get(brokerIndex).getHostName(), | |
brokerNames.get(brokerIndex).getPort(), | |
consumerName, topicName, partitionId, offset); | |
if (successful) | |
break; | |
Thread.sleep(sleepTime); | |
sleepTime *= 2; | |
brokerIndex = (brokerIndex + 1) / brokerNames.size(); | |
} | |
LOG.info("Updated offset for partitionId=" + partitionId | |
+ " offset=" + offset); | |
} | |
} | |
LOG.info("Updating offset - finished."); | |
} | |
/** | |
* Returns true on successful commit or false on failure on commit. | |
* | |
* @param brokerName | |
* @param portNo | |
* @param consumerGroupName | |
* @param topicName | |
* @param partition | |
* @param offset | |
* @return | |
*/ | |
private static boolean commitOffset(String brokerName, int portNo, | |
String consumerGroupName, String topicName, int partition, | |
long offset) { | |
BlockingChannel channel = new BlockingChannel(brokerName, portNo, | |
BlockingChannel.UseDefaultBufferSize(), | |
BlockingChannel.UseDefaultBufferSize(), 5000); | |
channel.connect(); | |
int correlationId = 0; | |
final TopicAndPartition topicAndPartition = new TopicAndPartition( | |
topicName, partition); | |
channel.send(new ConsumerMetadataRequest(consumerGroupName, | |
ConsumerMetadataRequest.CurrentVersion(), correlationId++, | |
CLIENT_ID)); | |
ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse | |
.readFrom(channel.receive().buffer()); | |
if (metadataResponse.errorCode() == ErrorMapping.NoError()) { | |
Broker offsetManager = metadataResponse.coordinator(); | |
// if the coordinator is different, from the above channel's host then reconnect | |
channel.disconnect(); | |
channel = new BlockingChannel(offsetManager.host(), | |
offsetManager.port(), | |
BlockingChannel.UseDefaultBufferSize(), | |
BlockingChannel.UseDefaultBufferSize(), 5000 /* read timeout in millis */); | |
channel.connect(); | |
} | |
long now = System.currentTimeMillis(); | |
Map<TopicAndPartition, OffsetAndMetadata> offsets = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>(); | |
offsets.put(topicAndPartition, new OffsetAndMetadata(offset, | |
"associated metadata", now)); | |
OffsetCommitRequest commitRequest = new OffsetCommitRequest( | |
consumerGroupName, offsets, correlationId++, CLIENT_ID, | |
(short) 1); | |
channel.send(commitRequest.underlying()); | |
OffsetCommitResponse commitResponse = OffsetCommitResponse | |
.readFrom(channel.receive().buffer()); | |
if (commitResponse.hasError()) { | |
LOG.error("Error occurred while committing offset."); | |
for (Object partitionErrorCode : commitResponse.errors().values()) { | |
if ((short) partitionErrorCode == ErrorMapping | |
.NotCoordinatorForConsumerCode() | |
|| (short) partitionErrorCode == ErrorMapping | |
.ConsumerCoordinatorNotAvailableCode()) { | |
channel.disconnect(); | |
} | |
else { | |
LOG.error("Unable to commit offset to kafka topic=" | |
+ topicName + "for partition=" + partition | |
+ " due to errorCode=" + partitionErrorCode); | |
} | |
} | |
return false; | |
} | |
LOG.info("Committed successfully the offset."); | |
return true; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment