-
-
Save rsumbaly/1329607 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package kafka.schemaregistry.util | |
import org.I0Itec.zkclient.ZkClient | |
import java.util.concurrent.atomic.AtomicBoolean | |
import org.apache.log4j.Logger | |
import org.apache.zookeeper.CreateMode | |
import java.util.concurrent.Callable | |
import org.I0Itec.zkclient.exception.{ZkInterruptedException, ZkException} | |
trait LockListener { | |
def lockAcquired | |
def lockReleased | |
} | |
trait ZkOperation { | |
def execute: Boolean | |
} | |
case class ZNodeName(name: String) extends Comparable[ZNodeName] { | |
private val logger = Logger.getLogger(getClass) | |
if (name == null) | |
throw new NullPointerException("Id cannot be null") | |
var prefix: String = name | |
var idx: Int = name.lastIndexOf('-') | |
var sequence: Int = -1 | |
if (idx >= 0) { | |
prefix = name.substring(0, idx) | |
try { | |
sequence = Integer.parseInt(name.substring(idx + 1)) | |
} catch { | |
case e1: NumberFormatException => { | |
logger.info("Number format exception for " + idx, e1) | |
} | |
case e2: ArrayIndexOutOfBoundsException => { | |
logger.info("Array out of bounds for " + idx, e2) | |
} | |
} | |
} | |
override def toString: String = name.toString | |
override def compareTo(that: ZNodeName): Int = { | |
var answer: Int = prefix.compareTo(that.prefix) | |
if (answer == 0) { | |
val s1: Int = this.sequence | |
val s2: Int = that.sequence | |
if (s1 == -1 && s2 == -1) | |
this.name.compareTo(that.name) | |
answer = if (s1 == -1) 1 else if (s2 == -1) -1 else s1 - s2 | |
} | |
answer | |
} | |
} | |
class WriteLock(zkClient: ZkClient, dir: String, lockListener: LockListener) { | |
class LockZkOperation extends ZkOperation { | |
override def execute: Boolean = { | |
do { | |
if (id == null) { | |
val sessionId: Long = zkClient. | |
} | |
} while (id == null) | |
return false | |
} | |
} | |
private val logger = Logger.getLogger(getClass) | |
val zop: LockZkOperation = new LockZkOperation() | |
var id: String = null | |
val ownerId: String = null | |
val closed: AtomicBoolean = new AtomicBoolean(false) | |
val retryDelayMs: Long = 500L | |
val retryCount: Int = 10 | |
def close = closed.compareAndSet(false, true) | |
def isClosed = closed.get | |
def retryOperation(operation: ZkOperation): Boolean = { | |
var exception: Exception = null | |
for (i <- 0 until retryCount) { | |
try { | |
return operation.execute | |
} catch { | |
case e: ZkException => { | |
if (exception != null) exception = e | |
if (i > 0) { | |
try { | |
Thread.sleep(i * retryDelayMs) | |
} catch { | |
case e1: InterruptedException => logger.debug("Failed to sleep ", e1) | |
} | |
} | |
} | |
} | |
} | |
throw exception | |
} | |
def ensurePathExists(path: String) = { | |
retryOperation(new ZkOperation() { | |
override def execute() = { | |
if (!zkClient.exists(path)) | |
zkClient.createPersistent(path, true) | |
} | |
}) | |
} | |
def isOwner = id != null && ownerId != null && id.equals(ownerId) | |
def unlock = { | |
synchronized { | |
if (!isClosed && id != null) { | |
try { | |
zkClient.delete(id) | |
} catch { | |
case e: ZkInterruptedException => { | |
logger.warn("Caught interrupted exception ", e) | |
Thread.currentThread.interrupt | |
} | |
case e1 => throw e1 | |
} finally { | |
if (lockListener != null) { | |
lockListener.lockReleased | |
} | |
id = null | |
} | |
} | |
} | |
} | |
def lock: Boolean = { | |
synchronized({ | |
if (isClosed) false | |
// Check if path exists | |
ensurePathExists(dir) | |
// Run the actual locking operation | |
retryOperation(zop) | |
}) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package kafka.schemaregistry.util | |
import junit.framework.Assert._ | |
import org.junit.Test | |
import org.scalatest.junit.JUnit3Suite | |
import java.util.TreeSet | |
class ZNodeNameTest extends JUnit3Suite { | |
@Test | |
def testOrderWithSamePrefix() { | |
val names: Array[String] = Array("x-3", "x-5", "x-11", "x-1") | |
val expected: Array[String] = Array("x-1", "x-3", "x-5", "x-11") | |
assertOrderedNodeNames(names, expected); | |
} | |
@Test | |
def testOrderWithDifferentPrefixes() { | |
val names: Array[String] = Array("r-3", "r-2", "r-1", "w-2", "w-1") | |
val expected: Array[String] = Array("r-1", "r-2", "r-3", "w-1", "w-2") | |
assertOrderedNodeNames(names, expected); | |
} | |
private def assertOrderedNodeNames(names: Array[String], expected: Array[String]) { | |
import scala.collection.JavaConversions._ | |
val size: Int = names.length; | |
assertEquals("The two arrays should be the same size!", names.length, expected.length); | |
var nodeNames: TreeSet[ZNodeName] = new TreeSet[ZNodeName]() | |
names.foreach(name => { | |
nodeNames.add(new ZNodeName(name)) | |
}) | |
var index: Int = 0; | |
val iter: Iterator[ZNodeName] = nodeNames.iterator | |
while (iter.hasNext) { | |
val nodeName: ZNodeName = iter.next | |
assertEquals("Node " + index, expected(index), nodeName.name) | |
index = index + 1 | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment