Created
May 10, 2012 16:41
-
-
Save mardambey/2654382 to your computer and use it in GitHub Desktop.
Reset Kafka offsets in ZooKeeper by deleting the corresponding nodes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Notes: This code uses AsyncValue[T], a custom class that uses actors | |
* to allow concurrent operations on the provided type. It can be replaced | |
* by an Atomic object from the java.util.concurrent package or something | |
* that provides similar functionality. | |
*/ | |
/** | |
* Resets the offsets for the given group / topic pair. | |
* This works by deleting the offsets recorded in | |
* ZooKeeper for that pair. | |
*/ | |
def resetOffset(groupId:String, topic:String) { | |
val zkHost = getZkForTopic(topic.toString) // if we can't get it we're going to fail | |
zkHost match { | |
case Some(host) => { | |
val connected = new AsyncValue[Boolean](false) | |
val zk = new ZooKeeper(host, 10000, new ZkWatcher(connected)) | |
var retry = 10 | |
while(!connected.get() && retry > 0) { | |
Thread.sleep(100) | |
retry = retry - 1 | |
} | |
if (connected.get()) { | |
getOffsetPaths(groupId, topic, zk).foreach(path => { | |
try { | |
log.fine("Deleting path from zk: " + path) | |
zk.delete(path, -1) // no need for version match hence -1 | |
} catch { | |
case e:Exception => log.severe("Could not delete path " + path + " from zk") | |
} | |
}) | |
} else { | |
// failed to connect | |
log.severe("Failed to connect to zookeeper on " + zkHost) | |
} | |
zk.close() | |
} | |
case _ => { | |
log.severe("Could not reset offset for group " + groupId + ", topic " + topic + ": no zkHost found") | |
} | |
} | |
} | |
/** | |
* Returns a list containing the full paths | |
* in ZooKeeper of the offsets for the given | |
* group / topic pair. | |
*/ | |
def getOffsetPaths(groupId:String, topic:String, zk:ZooKeeper) : List[String] = { | |
// build path where offsets are stored, no need for zk watch (hence false) | |
val parent = "/consumers/%s/offsets/%s".format(groupId, topic) | |
try { zk.getChildren(parent, false).map("%s/%s".format(parent, _)).toList } | |
catch { case _ => List[String]() } | |
} | |
/** | |
* A very simple ZooKeeper watcher that notifies | |
* its caller by means of an AsyncValue that the | |
* connection to ZooKeeper is now ready. | |
*/ | |
private class ZkWatcher(connected:AsyncValue[Boolean]) extends Watcher { | |
def process(event:WatchedEvent) { | |
event.getType() match { | |
case Event.EventType.None if event.getState() == Event.KeeperState.SyncConnected => { | |
// We are are being told that the state of the connection has changed to SyncConnected | |
connected.set(true) | |
} | |
case _ => | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment