Skip to content

Instantly share code, notes, and snippets.

@Randgalt
Created July 16, 2013 21:24
Show Gist options
  • Select an option

  • Save Randgalt/6015299 to your computer and use it in GitHub Desktop.

Select an option

Save Randgalt/6015299 to your computer and use it in GitHub Desktop.
package org.apache.curator.framework.recipes.leader;
import com.google.common.collect.Lists;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingCluster;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
public class TestRolling
{
private final TestingCluster cluster;
private final List<CuratorFramework> clients = Lists.newArrayList();
private final List<LeaderSelector> selectors = Lists.newArrayList();
private final AtomicReference<Integer> leader = new AtomicReference<Integer>(0);
private static final int SERVER_QTY = 3;
private static final int CLIENT_QTY = 2;
public static void main(String[] args) throws Exception
{
System.setProperty("log4j.configuration", "");
TestRolling testRolling = new TestRolling();
testRolling.test();
Thread.currentThread().join();
}
public TestRolling()
{
cluster = new TestingCluster(SERVER_QTY);
for ( int i = 0; i < CLIENT_QTY; ++i )
{
CuratorFramework client = CuratorFrameworkFactory.builder().connectString(cluster.getConnectString()).retryPolicy(new RetryOneTime(1)).sessionTimeoutMs(1000).build();
clients.add(client);
final int id = i;
LeaderSelectorListener listener = new LeaderSelectorListener()
{
private final Semaphore lostSemaphore = new Semaphore(0);
@Override
public void takeLeadership(CuratorFramework client) throws Exception
{
Integer previousId = leader.getAndSet(id);
try
{
if ( previousId != 0 )
{
System.err.println(String.format("!! More than one leader! Us=%d - Other=%d", id, previousId));
}
System.err.println(String.format("%d takes leadership", id));
lostSemaphore.acquire();
}
finally
{
if ( !leader.compareAndSet(id, 0) )
{
System.err.println(String.format("!! Could not clear leader value"));
}
System.err.println(String.format("%d is exiting", id));
}
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
if ( (newState == ConnectionState.LOST) || (newState == ConnectionState.SUSPENDED) )
{
if ( leader.get() == id )
{
System.err.println(String.format("%d is losing leadership", id));
lostSemaphore.release();
}
}
}
};
LeaderSelector selector = new LeaderSelector(client, "/leader", listener);
selector.setId(Integer.toString(i));
selectors.add(selector);
}
}
private void restartOneServer(int index) throws Exception
{
System.err.println(String.format("Stopping server index %d", index));
List<InstanceSpec> instanceSpecs = Lists.newArrayList(cluster.getInstances());
InstanceSpec instanceSpec = instanceSpecs.get(index);
if ( !cluster.killServer(instanceSpec) )
{
System.err.println(String.format("!! Could not kill server index %d", index));
}
else
{
cluster.restartServer(instanceSpec);
}
}
private void test() throws Exception
{
cluster.start();
for ( CuratorFramework client : clients )
{
client.start();
}
for ( LeaderSelector selector : selectors )
{
selector.autoRequeue();
selector.start();
}
for (int i = 0; i < 100000; i++)
{
Thread.sleep(3000);
int index = (i % SERVER_QTY);
restartOneServer(index);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment