Skip to content

Instantly share code, notes, and snippets.

@yoshi0309
Created July 17, 2018 06:38

Revisions

  1. yoshi0309 created this gist Jul 17, 2018.
    23 changes: 23 additions & 0 deletions ExceptionHandler.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,23 @@
    import java.util.concurrent.TimeUnit;

    public class ExceptionHandler
    implements Thread.UncaughtExceptionHandler {

    private final long WAIT_TIME = 60L;

    @Override public void uncaughtException(Thread thread, Throwable e) {
    // -------------------------------------------------
    // RuntimeException which was throwed in TestListener should handled here, but not.
    // -------------------------------------------------

    System.out.println("Exception has occurred.");
    e.printStackTrace();
    System.out.println("sleep thread in " + WAIT_TIME);

    try {
    TimeUnit.MILLISECONDS.sleep(WAIT_TIME);
    } catch (InterruptedException e1) {
    return;
    }
    }
    }
    18 changes: 18 additions & 0 deletions ExceptionThreadFactory.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,18 @@
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadFactory;

    public class ExceptionThreadFactory implements ThreadFactory {
    private static final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
    private final Thread.UncaughtExceptionHandler handler;

    public ExceptionThreadFactory(
    Thread.UncaughtExceptionHandler handler) {
    this.handler = handler;
    }

    @Override public Thread newThread(Runnable run) {
    Thread thread = defaultFactory.newThread(run);
    thread.setUncaughtExceptionHandler(handler);
    return thread;
    }
    }
    73 changes: 73 additions & 0 deletions Main1.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,73 @@
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.leader.LeaderSelector;
    import org.apache.curator.retry.ExponentialBackoffRetry;

    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.HashSet;
    import java.util.List;
    import java.util.Set;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadFactory;

    public class Main1 {
    public static void main(String[] args) {
    List<String> hosts = new ArrayList<String>();
    hosts.add("my.test.zookeeper:2181");

    Set<String> newSet = new HashSet<>(hosts);
    newSet = Collections.unmodifiableSet(newSet);
    final ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(500,3);
    CuratorFramework curatorFramework = null;
    try {
    curatorFramework = createCuratorFramework(newSet, 3000, 3000, 3, 3000, backoffRetry);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    TestListener listener = new TestListener();
    String electionNode = "/somewhere/leader"; // for test

    ThreadFactory factory = new ExceptionThreadFactory(new ExceptionHandler());
    ExecutorService executorService = Executors.newFixedThreadPool(1, factory);

    LeaderSelector selector = new LeaderSelector(curatorFramework, electionNode, executorService, listener);

    // start
    System.out.println("start selector.");
    selector.autoRequeue();
    selector.start();
    }

    private static CuratorFramework createCuratorFramework(
    final Set<String> connectionSet,
    final int sessionTimeoutMs,
    final int connectionTimeoutMs,
    final int connectionRetry,
    final int maxCloseWaitMs,
    final ExponentialBackoffRetry backoffRetry
    ) throws InterruptedException {
    CuratorFramework curatorFramework = CuratorFrameworkFactory
    .builder()
    .connectString(String.join(",", connectionSet))
    .connectionTimeoutMs(connectionTimeoutMs)
    .sessionTimeoutMs(sessionTimeoutMs)
    .maxCloseWaitMs(maxCloseWaitMs)
    .retryPolicy(backoffRetry)
    .defaultData(new byte[0])
    .build();
    curatorFramework.start();
    long start = System.nanoTime();
    boolean connection = false;

    for (int i = 0; i < connectionRetry; i++) {
    connection = curatorFramework.getZookeeperClient().blockUntilConnectedOrTimedOut();
    if (connection) {
    break;
    }
    }
    return curatorFramework;
    }
    }
    21 changes: 21 additions & 0 deletions TestListener.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,21 @@
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
    import org.apache.curator.framework.state.ConnectionState;

    public class TestListener implements LeaderSelectorListener {

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
    System.out.println("start takeLeadership.");

    // throw excetipn
    throw new RuntimeException("some runtime exception has been occurred !");

    //System.out.println("end takeLeadership.");
    }

    @Override
    public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
    System.out.println("Listener state changed: " + newState.toString());
    }
    }