Created
January 15, 2018 12:50
-
-
Save maslick/cb29e4053d7a5dfa5b16b302bff060fc to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.maslick.threading; | |
import com.google.common.util.concurrent.CycleDetectingLockFactory; | |
import com.google.common.util.concurrent.ListeningExecutorService; | |
import com.google.common.util.concurrent.MoreExecutors; | |
import com.google.common.util.concurrent.Uninterruptibles; | |
import javax.annotation.Nullable; | |
import java.util.concurrent.*; | |
import java.util.concurrent.locks.ReentrantLock; | |
/** | |
* Various threading related utilities. Provides a wrapper around explicit lock creation that lets you control whether | |
* bitcoinj performs cycle detection or not. Cycle detection is useful to detect bugs but comes with a small cost. | |
* Also provides a worker thread that is designed for event listeners to be dispatched on. | |
*/ | |
public class Threading { | |
///////////////////////////////////////////////////////////////////////////////////////////////////////////////// | |
// | |
// User thread/event handling utilities | |
// | |
///////////////////////////////////////////////////////////////////////////////////////////////////////////////// | |
/** | |
* An executor with one thread that is intended for running event listeners on. This ensures all event listener code | |
* runs without any locks being held. It's intended for the API user to run things on. Callbacks registered by | |
* bitcoinj internally shouldn't normally run here, although currently there are a few exceptions. | |
*/ | |
public static Executor USER_THREAD; | |
/** | |
* A dummy executor that just invokes the runnable immediately. Use this over more complex executors | |
* (e.g. those extending {@link ExecutorService}), which are overkill for our needs. | |
*/ | |
public static final Executor SAME_THREAD; | |
/** | |
* Put a dummy task into the queue and wait for it to be run. Because it's single threaded, this means all | |
* tasks submitted before this point are now completed. Usually you won't want to use this method - it's a | |
* convenience primarily used in unit testing. If you want to wait for an event to be called the right thing | |
* to do is usually to create a {@link com.google.common.util.concurrent.SettableFuture} and then call set | |
* on it. You can then either block on that future, compose it, add listeners to it and so on. | |
*/ | |
public static void waitForUserCode() { | |
final CountDownLatch latch = new CountDownLatch(1); | |
USER_THREAD.execute(latch::countDown); | |
Uninterruptibles.awaitUninterruptibly(latch); | |
} | |
/** | |
* An exception handler that will be invoked for any exceptions that occur in the user thread, and | |
* any unhandled exceptions that are caught whilst the framework is processing network traffic or doing other | |
* background tasks. The purpose of this is to allow you to report back unanticipated crashes from your users | |
* to a central collection center for analysis and debugging. You should configure this <b>before</b> any | |
* bitcoinj library code is run, setting it after you started network traffic and other forms of processing | |
* may result in the change not taking effect. | |
*/ | |
@Nullable | |
public static volatile Thread.UncaughtExceptionHandler uncaughtExceptionHandler; | |
public static class UserThread extends Thread implements Executor { | |
// 10,000 pending tasks is entirely arbitrary and may or may not be appropriate for the device we're | |
// running on. | |
public static int WARNING_THRESHOLD = 10000; | |
private LinkedBlockingQueue<Runnable> tasks; | |
public UserThread() { | |
super("bitcoinj user thread"); | |
setDaemon(true); | |
tasks = new LinkedBlockingQueue<>(); | |
start(); | |
} | |
@SuppressWarnings("InfiniteLoopStatement") | |
@Override | |
public void run() { | |
while (true) { | |
Runnable task = Uninterruptibles.takeUninterruptibly(tasks); | |
try { | |
task.run(); | |
} catch (Throwable throwable) { | |
System.out.println("Exception in user thread: " + throwable.getMessage()); | |
UncaughtExceptionHandler handler = uncaughtExceptionHandler; | |
if (handler != null) | |
handler.uncaughtException(this, throwable); | |
} | |
} | |
} | |
@Override | |
public void execute(Runnable command) { | |
final int size = tasks.size(); | |
if (size == WARNING_THRESHOLD) { | |
System.out.println( | |
"User thread has {} pending tasks, memory exhaustion may occur.\n" + | |
"If you see this message, check your memory consumption and see if it's problematic or excessively spikey.\n" + | |
"If it is, check for deadlocked or slow event handlers. If it isn't, try adjusting the constant \n" + | |
"Threading.UserThread.WARNING_THRESHOLD upwards until it's a suitable level for your app, or Integer.MAX_VALUE to disable."); | |
} | |
Uninterruptibles.putUninterruptibly(tasks, command); | |
} | |
} | |
static { | |
// Default policy goes here. If you want to change this, use one of the static methods before | |
// instantiating any bitcoinj objects. The policy change will take effect only on new objects | |
// from that point onwards. | |
throwOnLockCycles(); | |
USER_THREAD = new UserThread(); | |
SAME_THREAD = Runnable::run; | |
} | |
///////////////////////////////////////////////////////////////////////////////////////////////////////////////// | |
// | |
// Cycle detecting lock factories | |
// | |
///////////////////////////////////////////////////////////////////////////////////////////////////////////////// | |
private static CycleDetectingLockFactory.Policy policy; | |
public static CycleDetectingLockFactory factory; | |
public static ReentrantLock lock(String name) { | |
if (isAndroidRuntime()) | |
return new ReentrantLock(true); | |
else | |
return factory.newReentrantLock(name); | |
} | |
public static void warnOnLockCycles() { | |
setPolicy(CycleDetectingLockFactory.Policies.WARN); | |
} | |
public static void throwOnLockCycles() { | |
setPolicy(CycleDetectingLockFactory.Policies.THROW); | |
} | |
public static void ignoreLockCycles() { | |
setPolicy(CycleDetectingLockFactory.Policies.DISABLED); | |
} | |
public static void setPolicy(CycleDetectingLockFactory.Policy policy) { | |
Threading.policy = policy; | |
factory = CycleDetectingLockFactory.newInstance(policy); | |
} | |
public static CycleDetectingLockFactory.Policy getPolicy() { | |
return policy; | |
} | |
///////////////////////////////////////////////////////////////////////////////////////////////////////////////// | |
// | |
// Generic worker pool. | |
// | |
///////////////////////////////////////////////////////////////////////////////////////////////////////////////// | |
/** A caching thread pool that creates daemon threads, which won't keep the JVM alive waiting for more work. */ | |
public static ListeningExecutorService THREAD_POOL = MoreExecutors.listeningDecorator( | |
Executors.newCachedThreadPool(r -> { | |
Thread t = new Thread(r); | |
t.setName("Threading.THREAD_POOL worker"); | |
t.setDaemon(true); | |
return t; | |
}) | |
); | |
private static int isAndroid = -1; | |
public static boolean isAndroidRuntime() { | |
if (isAndroid == -1) { | |
final String runtime = System.getProperty("java.runtime.name"); | |
isAndroid = (runtime != null && runtime.equals("Android Runtime")) ? 1 : 0; | |
} | |
return isAndroid == 1; | |
} | |
} |
Author
maslick
commented
Feb 4, 2018
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment