Last active
April 9, 2021 15:50
-
-
Save quux00/f6be8fe223a7832ef514 to your computer and use it in GitHub Desktop.
Test of ZK Curator InterProcessMutex
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package quux00.curator; | |
import java.util.Collection; | |
import java.util.Iterator; | |
import java.util.UUID; | |
import java.util.concurrent.TimeUnit; | |
import org.apache.curator.RetryPolicy; | |
import org.apache.curator.framework.CuratorFramework; | |
import org.apache.curator.framework.CuratorFrameworkFactory; | |
import org.apache.curator.framework.api.CuratorEvent; | |
import org.apache.curator.framework.api.CuratorEventType; | |
import org.apache.curator.framework.api.CuratorListener; | |
import org.apache.curator.framework.api.UnhandledErrorListener; | |
import org.apache.curator.framework.recipes.locks.InterProcessMutex; | |
import org.apache.curator.framework.recipes.locks.RevocationListener; | |
import org.apache.curator.framework.recipes.locks.Revoker; | |
import org.apache.curator.framework.state.ConnectionState; | |
import org.apache.curator.framework.state.ConnectionStateListener; | |
import org.apache.curator.retry.ExponentialBackoffRetry; | |
import org.apache.curator.utils.CloseableUtils; | |
import org.apache.zookeeper.WatchedEvent; | |
import org.apache.zookeeper.Watcher.Event.EventType; | |
public class TryOutInterProcessMutex implements RevocationListener<InterProcessMutex>, ConnectionStateListener, UnhandledErrorListener, CuratorListener { | |
static int maxWait = 10; | |
static String lockPath = "/masterlock"; | |
static String namespace = "XXX"; | |
public static void main(String[] args) throws Exception { | |
String conxString = "trvlcdat2ap11.tsh.thomson.com:2181"; | |
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); | |
CuratorFramework zkc = CuratorFrameworkFactory.builder(). | |
namespace(namespace). | |
retryPolicy(retryPolicy). | |
connectString(conxString). | |
build(); | |
zkc.start(); | |
new TryOutInterProcessMutex(zkc, lockPath).engage(); | |
} | |
/* ---[ instance fields and methods ]--- */ | |
private final CuratorFramework client; | |
private final InterProcessMutex dlock; | |
private final UUID uuid = UUID.randomUUID(); | |
private volatile boolean lockRequested; | |
private volatile boolean isLeader = false; | |
private boolean suspendOperation = false; | |
private boolean connectionLost = false; | |
private CustomLockInternalsDriver clid = new CustomLockInternalsDriver(); | |
public TryOutInterProcessMutex(CuratorFramework zkc, String lockPath) { | |
client = zkc; | |
dlock = new InterProcessMutex(zkc, lockPath, clid); | |
dlock.makeRevocable(this); | |
zkc.getConnectionStateListenable().addListener(this); | |
zkc.getUnhandledErrorListenable().addListener(this); | |
zkc.getCuratorListenable().addListener(this); | |
lockRequested = false; | |
} | |
private void engage() throws Exception, InterruptedException { | |
for (int i = 0; i < 10; i++) { | |
if (dlock.acquire(maxWait, TimeUnit.SECONDS)) { | |
isLeader = true; | |
// try setting watch on my own lock path | |
String ownLockPath = clid.getOwnLockPath(); | |
client.checkExists().watched().inBackground().forPath(ownLockPath); | |
try { | |
System.out.println(uuid + " has acquired lock"); | |
for (int j = 0; j < 60; j++) { | |
if (!isLeader) { | |
System.out.println("No longer leader -- stopping work"); | |
break; | |
} | |
if (lockRequested) { | |
System.out.println("revocation was requested - so releasing the lock early"); | |
break; | |
} | |
if (suspendOperation) { | |
System.out.println("***** would now suspend operation"); | |
} | |
if (connectionLost) { | |
System.out.println("*#*#*#*# SHUTDOWN signalled"); | |
break; | |
} | |
// if get here, safe to do a round of work | |
System.out.println("%% working %%"); // \ placeholder for doing | |
Thread.sleep(1000); // / work in cycles | |
} | |
} finally { | |
System.out.println(">>>> Releasing lock"); | |
if (!connectionLost) { | |
dlock.release(); // THIS HANGS if you don't have a connection | |
} | |
System.out.println(">>>>### Closing client"); | |
System.out.flush(); | |
CloseableUtils.closeQuietly(client); | |
} | |
break; | |
} else { | |
System.out.println(uuid + " didn't get lock; attempt: " + i); | |
Collection<String> participantNodes = dlock.getParticipantNodes(); | |
System.out.println("participantNodes: " + participantNodes); | |
if (i == 3 || i == 7) { | |
Iterator<String> iter = participantNodes.iterator(); | |
if (iter.hasNext()) { | |
String fullLockPath = iter.next(); | |
if (fullLockPath != null) { | |
System.out.println("Going to attempt revoke on " + fullLockPath); | |
Revoker.attemptRevoke(client, fullLockPath); | |
System.out.println("Revoke attempt done ..."); | |
} | |
} | |
} | |
} | |
} | |
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> CLOSING CLIENT"); | |
CloseableUtils.closeQuietly(client); | |
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DONE"); | |
} | |
public void revocationRequested(InterProcessMutex mutex) { | |
System.out.println("### RevocationRequested: " + mutex.toString()); | |
lockRequested = true; | |
} | |
public void stateChanged(CuratorFramework zkc, ConnectionState conxState) { | |
System.out.println(">>>>> STATE CHANGED!!: " + conxState); | |
switch (conxState) { | |
case SUSPENDED: | |
suspendOperation = true; | |
break; | |
case RECONNECTED: | |
suspendOperation = false; | |
break; | |
case LOST: | |
connectionLost = true; | |
break; | |
default: | |
// do nothing | |
break; | |
} | |
} | |
public void unhandledError(String arg0, Throwable arg1) { | |
System.out.printf(">> ... Unhandled error. arg0: %s; arg1: %s\n", arg0, arg1); | |
} | |
public void eventReceived(CuratorFramework arg0, CuratorEvent event) throws Exception { | |
System.out.println("eventRd:: " + event.getName() + "; type=" + event.getType() ); | |
if (event.getType() == CuratorEventType.WATCHED) { | |
WatchedEvent watchedEvent = event.getWatchedEvent(); | |
System.out.printf("eventRd2: %s | %s | %s\n", watchedEvent, watchedEvent.getType(), watchedEvent.getPath()); | |
if (watchedEvent.getType() == EventType.NodeDeleted) { | |
if (watchedEvent.getPath().equals(clid.getOwnLockPath())) { | |
System.out.println("eventRd3: .... the watched node has been deleted => signalling"); | |
isLeader = false; | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment