Created
July 16, 2013 21:24
-
-
Save Randgalt/6015299 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package org.apache.curator.framework.recipes.leader; | |
| import com.google.common.collect.Lists; | |
| import org.apache.curator.framework.CuratorFramework; | |
| import org.apache.curator.framework.CuratorFrameworkFactory; | |
| import org.apache.curator.framework.state.ConnectionState; | |
| import org.apache.curator.retry.RetryOneTime; | |
| import org.apache.curator.test.InstanceSpec; | |
| import org.apache.curator.test.TestingCluster; | |
| import java.util.List; | |
| import java.util.concurrent.Semaphore; | |
| import java.util.concurrent.atomic.AtomicReference; | |
| public class TestRolling | |
| { | |
| private final TestingCluster cluster; | |
| private final List<CuratorFramework> clients = Lists.newArrayList(); | |
| private final List<LeaderSelector> selectors = Lists.newArrayList(); | |
| private final AtomicReference<Integer> leader = new AtomicReference<Integer>(0); | |
| private static final int SERVER_QTY = 3; | |
| private static final int CLIENT_QTY = 2; | |
| public static void main(String[] args) throws Exception | |
| { | |
| System.setProperty("log4j.configuration", ""); | |
| TestRolling testRolling = new TestRolling(); | |
| testRolling.test(); | |
| Thread.currentThread().join(); | |
| } | |
| public TestRolling() | |
| { | |
| cluster = new TestingCluster(SERVER_QTY); | |
| for ( int i = 0; i < CLIENT_QTY; ++i ) | |
| { | |
| CuratorFramework client = CuratorFrameworkFactory.builder().connectString(cluster.getConnectString()).retryPolicy(new RetryOneTime(1)).sessionTimeoutMs(1000).build(); | |
| clients.add(client); | |
| final int id = i; | |
| LeaderSelectorListener listener = new LeaderSelectorListener() | |
| { | |
| private final Semaphore lostSemaphore = new Semaphore(0); | |
| @Override | |
| public void takeLeadership(CuratorFramework client) throws Exception | |
| { | |
| Integer previousId = leader.getAndSet(id); | |
| try | |
| { | |
| if ( previousId != 0 ) | |
| { | |
| System.err.println(String.format("!! More than one leader! Us=%d - Other=%d", id, previousId)); | |
| } | |
| System.err.println(String.format("%d takes leadership", id)); | |
| lostSemaphore.acquire(); | |
| } | |
| finally | |
| { | |
| if ( !leader.compareAndSet(id, 0) ) | |
| { | |
| System.err.println(String.format("!! Could not clear leader value")); | |
| } | |
| System.err.println(String.format("%d is exiting", id)); | |
| } | |
| } | |
| @Override | |
| public void stateChanged(CuratorFramework client, ConnectionState newState) | |
| { | |
| if ( (newState == ConnectionState.LOST) || (newState == ConnectionState.SUSPENDED) ) | |
| { | |
| if ( leader.get() == id ) | |
| { | |
| System.err.println(String.format("%d is losing leadership", id)); | |
| lostSemaphore.release(); | |
| } | |
| } | |
| } | |
| }; | |
| LeaderSelector selector = new LeaderSelector(client, "/leader", listener); | |
| selector.setId(Integer.toString(i)); | |
| selectors.add(selector); | |
| } | |
| } | |
| private void restartOneServer(int index) throws Exception | |
| { | |
| System.err.println(String.format("Stopping server index %d", index)); | |
| List<InstanceSpec> instanceSpecs = Lists.newArrayList(cluster.getInstances()); | |
| InstanceSpec instanceSpec = instanceSpecs.get(index); | |
| if ( !cluster.killServer(instanceSpec) ) | |
| { | |
| System.err.println(String.format("!! Could not kill server index %d", index)); | |
| } | |
| else | |
| { | |
| cluster.restartServer(instanceSpec); | |
| } | |
| } | |
| private void test() throws Exception | |
| { | |
| cluster.start(); | |
| for ( CuratorFramework client : clients ) | |
| { | |
| client.start(); | |
| } | |
| for ( LeaderSelector selector : selectors ) | |
| { | |
| selector.autoRequeue(); | |
| selector.start(); | |
| } | |
| for (int i = 0; i < 100000; i++) | |
| { | |
| Thread.sleep(3000); | |
| int index = (i % SERVER_QTY); | |
| restartOneServer(index); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment