Last active
October 11, 2015 23:28
-
-
Save ericacm/3936160 to your computer and use it in GitHub Desktop.
start Zookeeper server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// hosts is in the form host:port,host:port | |
@Value("${zookeeperService.hosts:localhost}") | |
@ManagedGetter @BeanProperty | |
var hosts: String = _ | |
val hostsArr = hosts.split(",") | |
val serversKeyArr = new Array[String](hostsArr.length) | |
val serversValArr = new Array[String](hostsArr.length) | |
for ((server, idx) <- hostsArr.zipWithIndex) { | |
val (host, port) = if (server.contains(":")) { | |
val parts = server.split(":") | |
(parts(0), parts(1).toInt) | |
} else { | |
(server, clientPort) | |
} | |
hostsArr(idx) = host + ":" + port | |
serversKeyArr(idx) = "server." + (idx+1) | |
serversValArr(idx) = host + ":" + (port+1) + ":" + (port+2) | |
} | |
val connectString = hostsArr.mkString(",") | |
def startServer( | |
hostsArr: Array[String], | |
serversKeyArr: Array[String], | |
serversValArr: Array[String]) { | |
if (dataDir == null || dataDir == "") { | |
dataDir = "./zookeeper-" + nodeId | |
} | |
val dir = new File(dataDir) | |
val startupProperties = new Properties() | |
val useQuorum = hostsArr.length > 1 | |
if (useQuorum) { | |
for ((server, idx) <- hostsArr.zipWithIndex) { | |
startupProperties.put(serversKeyArr(idx), serversValArr(idx)) | |
if (server == nodeId) { | |
val myidFile = new File(dir, "myid") | |
val fw = new FileWriter(myidFile) | |
fw.write((idx+1).toString + "\n") | |
fw.close() | |
} | |
} | |
} | |
startupProperties.put("tickTime", "1000") | |
startupProperties.put("initLimit", "10") | |
startupProperties.put("syncLimit", "5") | |
startupProperties.put("dataDir", dataDir) | |
startupProperties.put("clientPort", clientPort.toString) | |
startupProperties.put("skipACL", "true") | |
startupProperties.put("autopurge.purgeInterval", "24") | |
val serverStartFunc: Function0[Unit] = | |
if (useQuorum) { // Replicated Zookeeper | |
val quorumConfiguration = new QuorumPeerConfig | |
quorumConfiguration.parseProperties(startupProperties) | |
quorumPeerMain = new QuorumPeerMain | |
val startFunc = { () => | |
quorumPeerMain.runFromConfig(quorumConfiguration) | |
} | |
startFunc | |
} else { // Standalone Zookeeper | |
val quorumConfiguration = new QuorumPeerConfig | |
quorumConfiguration.parseProperties(startupProperties) | |
val configuration = new ServerConfig | |
configuration.readFrom(quorumConfiguration) | |
zkServerMain = new ZooKeeperServerMain | |
val startFunc = { () => | |
zkServerMain.runFromConfig(configuration) | |
} | |
startFunc | |
} | |
val serverStarted = new Semaphore(0) | |
var serverStartException: Option[Exception] = None | |
startThread("startZookeeper") { | |
try { | |
log.info("Starting ZooKeeper server. clientPort=" + clientPort + " replicated=" + useQuorum) | |
serverStartFunc() | |
serverStarted.release() | |
} catch { | |
case ioex: IOException => | |
log.error("ZooKeeper Failed - data directory corruption: " + ioex.getMessage, ioex) | |
serverStartException = Some(ioex) | |
case ex: Exception => | |
log.error("ZooKeeper Failed: " + ex.getMessage, ex) | |
serverStartException = Some(ex) | |
} | |
} | |
if (!serverStarted.tryAcquire(waitingSec, TimeUnit.SECONDS)) { | |
serverStartException match { | |
case Some(ex) => throw new RuntimeException(ex) | |
case None => | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment