Skip to content

Instantly share code, notes, and snippets.

@Randgalt
Last active December 19, 2015 20:19
Show Gist options
  • Save Randgalt/6012358 to your computer and use it in GitHub Desktop.
Save Randgalt/6012358 to your computer and use it in GitHub Desktop.
final AtomicBoolean hasLeadership = new AtomicBoolean(false);
public LeaderSelector startParticipant(CuratorFramework client, int id) throws Exception {
LeaderSelector selector = null;
final int myid = id;
LeaderSelectorListener listener = new LeaderSelectorListener()
{
volatile CountDownLatch holdLatch;
@Override
public void takeLeadership(CuratorFramework client) throws Exception
{
holdLatch = new CountDownLatch(1);
if ( !hasLeadership.compareAndSet(false, true) )
{
log("Another process has leadership. Me:" + myid + " - Leader:" + leader.get());
}
try
{
int previous_leader = leader.getAndSet(myid);
log(myid + " takes leadership. Leader was:" + previous_leader);
holdLatch.await();
}
finally
{
holdLatch = null;
hasLeadership.set(false);
}
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
if ((newState == ConnectionState.LOST) || (newState == ConnectionState.SUSPENDED))
{
if ( holdLatch != null )
{
holdLatch.countDown();
}
if (leader.get() == myid)
{
try {log(myid + " looses leadership.");} catch (Exception e) {};
leader.set(-1);
}
}
}
};
selector = new LeaderSelector(client, "/leader", listener);
selector.setId("" + myid);
selector.autoRequeue();
selector.start();
return selector;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment