Skip to content

Instantly share code, notes, and snippets.

@Randgalt
Last active October 2, 2015 00:06
Show Gist options
  • Save Randgalt/c3358b740f7f09d47bb0 to your computer and use it in GitHub Desktop.
Save Randgalt/c3358b740f7f09d47bb0 to your computer and use it in GitHub Desktop.
package test;
import org.apache.curator.test.TestingServer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.io.File;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
public class TestZKSaveConnLoss
{
static TestingServer server;
static ZooKeeper zooKeeper;
private final static String path = "/test";
private final static String dataStr = "test";
static Thread clientThread, serverThread;
public static void main(String[] args) throws Exception {
server = new TestingServer();
final CountDownLatch latch = new CountDownLatch(1);
Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
if ( event.getState() == Event.KeeperState.SyncConnected )
{
latch.countDown();
}
}
};
zooKeeper = new ZooKeeper(server.getConnectString(), 30 * 1000, watcher);
latch.await();
log("starting");
start();
Thread.sleep(100000);
}
private static void start() {
serverThread = new Thread() {
public void run() {
while (true) {
try {
log("Restarting server...");
int port = server.getPort();
File tempDirectory = server.getTempDirectory();
server.stop();
server = new TestingServer(port, tempDirectory);
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
fail("Failed to restart TestingServer", e);
}
}
}
};
serverThread.start();
clientThread = new Thread() {
public void run() {
while (true) {
try {
log("Recreating node...");
if (checkExists() != null) {
log("Deleting " + path);
delete();
log(path + " has been deleted.");
}
if (checkExists() != null) {
fail("ZK node still here!", null);
} else {
log("Node " + path
+ " has successfully been removed.");
}
log("Recreating " + path);
try {
create();
} catch (NodeExistsException e) {
log("ERROR: node should be removed.");
byte[] data = getData();
log("Data of node is: " + new String(data));
fail("Node has been mysteriously created...", e);
}
log(path + " has been created.");
byte[] data = getData();
String content = new String(data);
if (!dataStr.equals(content)) {
fail("Data doesn't match: " + content, null);
} else {
log("Data has been verified.");
}
} catch (Exception e) {
fail("Client error", e);
}
}
}
};
clientThread.start();
try {
serverThread.join();
clientThread.join();
} catch (InterruptedException e) {
log("Interrupted.");
System.exit(1);
}
}
private static byte[] getData() throws Exception
{
for(;;)
{
try
{
return zooKeeper.getData(path, false, null);
}
catch ( KeeperException.ConnectionLossException e )
{
Thread.sleep(1000);
}
}
}
private static void create() throws Exception
{
for(;;)
{
try
{
zooKeeper.create(path, dataStr.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
break;
}
catch ( KeeperException.ConnectionLossException e )
{
Thread.sleep(1000);
}
}
}
private static void delete() throws Exception
{
for(;;)
{
try
{
zooKeeper.delete(path, -1);
break;
}
catch ( KeeperException.ConnectionLossException e )
{
Thread.sleep(1000);
}
}
}
private static Stat checkExists() throws Exception
{
for(;;)
{
try
{
return zooKeeper.exists(path, false);
}
catch ( KeeperException.ConnectionLossException e )
{
Thread.sleep(1000);
}
}
}
private static void log(String msg) {
System.out.println(new Date() + " - " + msg);
}
private static void fail(String msg, Exception e) {
System.err.println(msg);
if (e != null) {
e.printStackTrace();
}
System.exit(1);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment