Skip to content

Instantly share code, notes, and snippets.

@quux00
Last active April 9, 2021 15:50
Show Gist options
  • Save quux00/f6be8fe223a7832ef514 to your computer and use it in GitHub Desktop.
Save quux00/f6be8fe223a7832ef514 to your computer and use it in GitHub Desktop.
Test of ZK Curator InterProcessMutex
package quux00.curator;
import java.util.Collection;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.RevocationListener;
import org.apache.curator.framework.recipes.locks.Revoker;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
public class TryOutInterProcessMutex implements RevocationListener<InterProcessMutex>, ConnectionStateListener, UnhandledErrorListener, CuratorListener {
static int maxWait = 10;
static String lockPath = "/masterlock";
static String namespace = "XXX";
public static void main(String[] args) throws Exception {
String conxString = "trvlcdat2ap11.tsh.thomson.com:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework zkc = CuratorFrameworkFactory.builder().
namespace(namespace).
retryPolicy(retryPolicy).
connectString(conxString).
build();
zkc.start();
new TryOutInterProcessMutex(zkc, lockPath).engage();
}
/* ---[ instance fields and methods ]--- */
private final CuratorFramework client;
private final InterProcessMutex dlock;
private final UUID uuid = UUID.randomUUID();
private volatile boolean lockRequested;
private volatile boolean isLeader = false;
private boolean suspendOperation = false;
private boolean connectionLost = false;
private CustomLockInternalsDriver clid = new CustomLockInternalsDriver();
public TryOutInterProcessMutex(CuratorFramework zkc, String lockPath) {
client = zkc;
dlock = new InterProcessMutex(zkc, lockPath, clid);
dlock.makeRevocable(this);
zkc.getConnectionStateListenable().addListener(this);
zkc.getUnhandledErrorListenable().addListener(this);
zkc.getCuratorListenable().addListener(this);
lockRequested = false;
}
private void engage() throws Exception, InterruptedException {
for (int i = 0; i < 10; i++) {
if (dlock.acquire(maxWait, TimeUnit.SECONDS)) {
isLeader = true;
// try setting watch on my own lock path
String ownLockPath = clid.getOwnLockPath();
client.checkExists().watched().inBackground().forPath(ownLockPath);
try {
System.out.println(uuid + " has acquired lock");
for (int j = 0; j < 60; j++) {
if (!isLeader) {
System.out.println("No longer leader -- stopping work");
break;
}
if (lockRequested) {
System.out.println("revocation was requested - so releasing the lock early");
break;
}
if (suspendOperation) {
System.out.println("***** would now suspend operation");
}
if (connectionLost) {
System.out.println("*#*#*#*# SHUTDOWN signalled");
break;
}
// if get here, safe to do a round of work
System.out.println("%% working %%"); // \ placeholder for doing
Thread.sleep(1000); // / work in cycles
}
} finally {
System.out.println(">>>> Releasing lock");
if (!connectionLost) {
dlock.release(); // THIS HANGS if you don't have a connection
}
System.out.println(">>>>### Closing client");
System.out.flush();
CloseableUtils.closeQuietly(client);
}
break;
} else {
System.out.println(uuid + " didn't get lock; attempt: " + i);
Collection<String> participantNodes = dlock.getParticipantNodes();
System.out.println("participantNodes: " + participantNodes);
if (i == 3 || i == 7) {
Iterator<String> iter = participantNodes.iterator();
if (iter.hasNext()) {
String fullLockPath = iter.next();
if (fullLockPath != null) {
System.out.println("Going to attempt revoke on " + fullLockPath);
Revoker.attemptRevoke(client, fullLockPath);
System.out.println("Revoke attempt done ...");
}
}
}
}
}
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> CLOSING CLIENT");
CloseableUtils.closeQuietly(client);
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DONE");
}
public void revocationRequested(InterProcessMutex mutex) {
System.out.println("### RevocationRequested: " + mutex.toString());
lockRequested = true;
}
public void stateChanged(CuratorFramework zkc, ConnectionState conxState) {
System.out.println(">>>>> STATE CHANGED!!: " + conxState);
switch (conxState) {
case SUSPENDED:
suspendOperation = true;
break;
case RECONNECTED:
suspendOperation = false;
break;
case LOST:
connectionLost = true;
break;
default:
// do nothing
break;
}
}
public void unhandledError(String arg0, Throwable arg1) {
System.out.printf(">> ... Unhandled error. arg0: %s; arg1: %s\n", arg0, arg1);
}
public void eventReceived(CuratorFramework arg0, CuratorEvent event) throws Exception {
System.out.println("eventRd:: " + event.getName() + "; type=" + event.getType() );
if (event.getType() == CuratorEventType.WATCHED) {
WatchedEvent watchedEvent = event.getWatchedEvent();
System.out.printf("eventRd2: %s | %s | %s\n", watchedEvent, watchedEvent.getType(), watchedEvent.getPath());
if (watchedEvent.getType() == EventType.NodeDeleted) {
if (watchedEvent.getPath().equals(clid.getOwnLockPath())) {
System.out.println("eventRd3: .... the watched node has been deleted => signalling");
isLeader = false;
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment