Skip to content

Instantly share code, notes, and snippets.

@maslick
Created January 15, 2018 12:50
Show Gist options
  • Save maslick/cb29e4053d7a5dfa5b16b302bff060fc to your computer and use it in GitHub Desktop.
Save maslick/cb29e4053d7a5dfa5b16b302bff060fc to your computer and use it in GitHub Desktop.
package com.maslick.threading;
import com.google.common.util.concurrent.CycleDetectingLockFactory;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import javax.annotation.Nullable;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
/**
* Various threading related utilities. Provides a wrapper around explicit lock creation that lets you control whether
* bitcoinj performs cycle detection or not. Cycle detection is useful to detect bugs but comes with a small cost.
* Also provides a worker thread that is designed for event listeners to be dispatched on.
*/
public class Threading {
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
// User thread/event handling utilities
//
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/**
* An executor with one thread that is intended for running event listeners on. This ensures all event listener code
* runs without any locks being held. It's intended for the API user to run things on. Callbacks registered by
* bitcoinj internally shouldn't normally run here, although currently there are a few exceptions.
*/
public static Executor USER_THREAD;
/**
* A dummy executor that just invokes the runnable immediately. Use this over more complex executors
* (e.g. those extending {@link ExecutorService}), which are overkill for our needs.
*/
public static final Executor SAME_THREAD;
/**
* Put a dummy task into the queue and wait for it to be run. Because it's single threaded, this means all
* tasks submitted before this point are now completed. Usually you won't want to use this method - it's a
* convenience primarily used in unit testing. If you want to wait for an event to be called the right thing
* to do is usually to create a {@link com.google.common.util.concurrent.SettableFuture} and then call set
* on it. You can then either block on that future, compose it, add listeners to it and so on.
*/
public static void waitForUserCode() {
final CountDownLatch latch = new CountDownLatch(1);
USER_THREAD.execute(latch::countDown);
Uninterruptibles.awaitUninterruptibly(latch);
}
/**
* An exception handler that will be invoked for any exceptions that occur in the user thread, and
* any unhandled exceptions that are caught whilst the framework is processing network traffic or doing other
* background tasks. The purpose of this is to allow you to report back unanticipated crashes from your users
* to a central collection center for analysis and debugging. You should configure this <b>before</b> any
* bitcoinj library code is run, setting it after you started network traffic and other forms of processing
* may result in the change not taking effect.
*/
@Nullable
public static volatile Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
public static class UserThread extends Thread implements Executor {
// 10,000 pending tasks is entirely arbitrary and may or may not be appropriate for the device we're
// running on.
public static int WARNING_THRESHOLD = 10000;
private LinkedBlockingQueue<Runnable> tasks;
public UserThread() {
super("bitcoinj user thread");
setDaemon(true);
tasks = new LinkedBlockingQueue<>();
start();
}
@SuppressWarnings("InfiniteLoopStatement")
@Override
public void run() {
while (true) {
Runnable task = Uninterruptibles.takeUninterruptibly(tasks);
try {
task.run();
} catch (Throwable throwable) {
System.out.println("Exception in user thread: " + throwable.getMessage());
UncaughtExceptionHandler handler = uncaughtExceptionHandler;
if (handler != null)
handler.uncaughtException(this, throwable);
}
}
}
@Override
public void execute(Runnable command) {
final int size = tasks.size();
if (size == WARNING_THRESHOLD) {
System.out.println(
"User thread has {} pending tasks, memory exhaustion may occur.\n" +
"If you see this message, check your memory consumption and see if it's problematic or excessively spikey.\n" +
"If it is, check for deadlocked or slow event handlers. If it isn't, try adjusting the constant \n" +
"Threading.UserThread.WARNING_THRESHOLD upwards until it's a suitable level for your app, or Integer.MAX_VALUE to disable.");
}
Uninterruptibles.putUninterruptibly(tasks, command);
}
}
static {
// Default policy goes here. If you want to change this, use one of the static methods before
// instantiating any bitcoinj objects. The policy change will take effect only on new objects
// from that point onwards.
throwOnLockCycles();
USER_THREAD = new UserThread();
SAME_THREAD = Runnable::run;
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
// Cycle detecting lock factories
//
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
private static CycleDetectingLockFactory.Policy policy;
public static CycleDetectingLockFactory factory;
public static ReentrantLock lock(String name) {
if (isAndroidRuntime())
return new ReentrantLock(true);
else
return factory.newReentrantLock(name);
}
public static void warnOnLockCycles() {
setPolicy(CycleDetectingLockFactory.Policies.WARN);
}
public static void throwOnLockCycles() {
setPolicy(CycleDetectingLockFactory.Policies.THROW);
}
public static void ignoreLockCycles() {
setPolicy(CycleDetectingLockFactory.Policies.DISABLED);
}
public static void setPolicy(CycleDetectingLockFactory.Policy policy) {
Threading.policy = policy;
factory = CycleDetectingLockFactory.newInstance(policy);
}
public static CycleDetectingLockFactory.Policy getPolicy() {
return policy;
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
// Generic worker pool.
//
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/** A caching thread pool that creates daemon threads, which won't keep the JVM alive waiting for more work. */
public static ListeningExecutorService THREAD_POOL = MoreExecutors.listeningDecorator(
Executors.newCachedThreadPool(r -> {
Thread t = new Thread(r);
t.setName("Threading.THREAD_POOL worker");
t.setDaemon(true);
return t;
})
);
private static int isAndroid = -1;
public static boolean isAndroidRuntime() {
if (isAndroid == -1) {
final String runtime = System.getProperty("java.runtime.name");
isAndroid = (runtime != null && runtime.equals("Android Runtime")) ? 1 : 0;
}
return isAndroid == 1;
}
}
@maslick
Copy link
Author

maslick commented Feb 4, 2018

dependencies {
    // Guava
    compile group: 'com.google.guava', name: 'guava', version: '23.6-android'
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment