Created
July 17, 2018 06:38
-
-
Save yoshi0309/36a58d6c2130057d649f8197e93309ea to your computer and use it in GitHub Desktop.
CuratorFramework Experiment handling RuntimeException which throws in a Listener.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.TimeUnit; | |
public class ExceptionHandler | |
implements Thread.UncaughtExceptionHandler { | |
private final long WAIT_TIME = 60L; | |
@Override public void uncaughtException(Thread thread, Throwable e) { | |
// ------------------------------------------------- | |
// RuntimeException which was throwed in TestListener should handled here, but not. | |
// ------------------------------------------------- | |
System.out.println("Exception has occurred."); | |
e.printStackTrace(); | |
System.out.println("sleep thread in " + WAIT_TIME); | |
try { | |
TimeUnit.MILLISECONDS.sleep(WAIT_TIME); | |
} catch (InterruptedException e1) { | |
return; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.Executors; | |
import java.util.concurrent.ThreadFactory; | |
public class ExceptionThreadFactory implements ThreadFactory { | |
private static final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); | |
private final Thread.UncaughtExceptionHandler handler; | |
public ExceptionThreadFactory( | |
Thread.UncaughtExceptionHandler handler) { | |
this.handler = handler; | |
} | |
@Override public Thread newThread(Runnable run) { | |
Thread thread = defaultFactory.newThread(run); | |
thread.setUncaughtExceptionHandler(handler); | |
return thread; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.curator.framework.CuratorFramework; | |
import org.apache.curator.framework.CuratorFrameworkFactory; | |
import org.apache.curator.framework.recipes.leader.LeaderSelector; | |
import org.apache.curator.retry.ExponentialBackoffRetry; | |
import java.util.ArrayList; | |
import java.util.Collections; | |
import java.util.HashSet; | |
import java.util.List; | |
import java.util.Set; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.ThreadFactory; | |
public class Main1 { | |
public static void main(String[] args) { | |
List<String> hosts = new ArrayList<String>(); | |
hosts.add("my.test.zookeeper:2181"); | |
Set<String> newSet = new HashSet<>(hosts); | |
newSet = Collections.unmodifiableSet(newSet); | |
final ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(500,3); | |
CuratorFramework curatorFramework = null; | |
try { | |
curatorFramework = createCuratorFramework(newSet, 3000, 3000, 3, 3000, backoffRetry); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
TestListener listener = new TestListener(); | |
String electionNode = "/somewhere/leader"; // for test | |
ThreadFactory factory = new ExceptionThreadFactory(new ExceptionHandler()); | |
ExecutorService executorService = Executors.newFixedThreadPool(1, factory); | |
LeaderSelector selector = new LeaderSelector(curatorFramework, electionNode, executorService, listener); | |
// start | |
System.out.println("start selector."); | |
selector.autoRequeue(); | |
selector.start(); | |
} | |
private static CuratorFramework createCuratorFramework( | |
final Set<String> connectionSet, | |
final int sessionTimeoutMs, | |
final int connectionTimeoutMs, | |
final int connectionRetry, | |
final int maxCloseWaitMs, | |
final ExponentialBackoffRetry backoffRetry | |
) throws InterruptedException { | |
CuratorFramework curatorFramework = CuratorFrameworkFactory | |
.builder() | |
.connectString(String.join(",", connectionSet)) | |
.connectionTimeoutMs(connectionTimeoutMs) | |
.sessionTimeoutMs(sessionTimeoutMs) | |
.maxCloseWaitMs(maxCloseWaitMs) | |
.retryPolicy(backoffRetry) | |
.defaultData(new byte[0]) | |
.build(); | |
curatorFramework.start(); | |
long start = System.nanoTime(); | |
boolean connection = false; | |
for (int i = 0; i < connectionRetry; i++) { | |
connection = curatorFramework.getZookeeperClient().blockUntilConnectedOrTimedOut(); | |
if (connection) { | |
break; | |
} | |
} | |
return curatorFramework; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.curator.framework.CuratorFramework; | |
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; | |
import org.apache.curator.framework.state.ConnectionState; | |
public class TestListener implements LeaderSelectorListener { | |
@Override | |
public void takeLeadership(CuratorFramework client) throws Exception { | |
System.out.println("start takeLeadership."); | |
// throw excetipn | |
throw new RuntimeException("some runtime exception has been occurred !"); | |
//System.out.println("end takeLeadership."); | |
} | |
@Override | |
public void stateChanged(final CuratorFramework client, final ConnectionState newState) { | |
System.out.println("Listener state changed: " + newState.toString()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment