Last active
July 25, 2019 01:03
-
-
Save trenthudy/d6cb6b223676cae349a2c40398dc45aa to your computer and use it in GitHub Desktop.
A simple Android threading solution, that allows work to be lifted from the main thread and delegated to a background thread pool.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.hudepohl.custom_thread_sample; | |
import android.os.Handler; | |
import android.os.Looper; | |
import android.os.Message; | |
import androidx.annotation.MainThread; | |
import androidx.annotation.WorkerThread; | |
import org.jetbrains.annotations.NotNull; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.Executor; | |
import java.util.concurrent.FutureTask; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.ThreadFactory; | |
import java.util.concurrent.ThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicInteger; | |
/** | |
* Created by Trent Hudepohl on 4/12/18. | |
* | |
* A simple Android threading solution, that allows work to be lifted from the main thread | |
* and delegated to a background thread pool. After the background work completes, its result | |
* is dispatched back to the main thread. | |
* | |
* This solution is inspired by Android's 'android.os.AsyncTask', but aims to improve some of its | |
* flaws / missing features: | |
* | |
* - Too abstract for most use cases (i.e. it requires three generic type parameters). | |
* - Lack of a clear distinction between an "error" and a "result". | |
* - Exceptions thrown during the work execution are not passed back cleanly. | |
* - Lack of a clear state update callback (i.e. when the work starts "processing"). | |
* - Exposes the complexities of multi-threading, when most use cases do not require it. | |
* | |
* Uses 'java.util.concurrent.ThreadPoolExecutor' to create a thread pool with a size based on | |
* the device's CPU count. While this should satisfy the majority of use cases, projects doing | |
* a ton of background processing should consider a more robust solution (like RxJava). | |
* | |
* The work is not started until 'execute()' is called. All instances of BackgroundWork can only | |
* be executed one time. After 'execute()' is called the first time, subsequent calls will result | |
* in an 'java.lang.IllegalStateException'. | |
* | |
* Work can be cancelled by calling 'cancel()'. It is recommended to call 'cancel()' on work that | |
* is no longer needed whenever possible. Calling 'execute()' on work that has already been | |
* cancelled will result in an 'java.lang.IllegalStateException'. | |
* | |
* The three terminating states are 'RESULT', 'ERROR', and 'CANCELLED'. After 'onStateChange()' | |
* has received one of these states, no other callbacks will be invoked. | |
*/ | |
public abstract class BackgroundWork<T> { | |
private static final String THREAD_NAME_PREFACE = "BackgroundWork_"; | |
private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() { | |
private final AtomicInteger mCount = new AtomicInteger(1); | |
public Thread newThread(Runnable r) { | |
return new Thread(r, THREAD_NAME_PREFACE + mCount.getAndIncrement()); | |
} | |
}; | |
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors(); | |
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4)); | |
private static final int MAXIMUM_POOL_SIZE = (CPU_COUNT * 2); | |
private static final int KEEP_ALIVE_TIME = 30; | |
private static final TimeUnit KEEP_ALIVE_TIME_UNIT = TimeUnit.SECONDS; | |
private static final BlockingQueue<Runnable> POOL_WORK_QUEUE = new LinkedBlockingQueue<>(128); | |
private static final Executor THREAD_POOL_EXECUTOR; | |
static { | |
ThreadPoolExecutor threadPoolExecutor = | |
new ThreadPoolExecutor( | |
CORE_POOL_SIZE, | |
MAXIMUM_POOL_SIZE, | |
KEEP_ALIVE_TIME, | |
KEEP_ALIVE_TIME_UNIT, | |
POOL_WORK_QUEUE, | |
THREAD_FACTORY | |
); | |
threadPoolExecutor.allowCoreThreadTimeOut(true); | |
THREAD_POOL_EXECUTOR = threadPoolExecutor; | |
} | |
private final AtomicBoolean wasInvoked = new AtomicBoolean(false); | |
private final AtomicBoolean wasErrored = new AtomicBoolean(false); | |
private final AtomicBoolean wasCancelled = new AtomicBoolean(false); | |
private final Handler handler; | |
private final FutureTask<T> futureTask; | |
private T result = null; | |
private Throwable error = null; | |
public BackgroundWork() { | |
this.handler = new MainThreadHandler(); | |
Callable<T> callableWork = new Callable<T>() { | |
@Override | |
public T call() { | |
wasInvoked.set(true); | |
publishProcessing(); | |
try { | |
result = doBackgroundWork(); | |
publishResult(); | |
} catch (Exception exception) { | |
if (!wasCancelled.get()) { | |
if (wasErrored.compareAndSet(false, true)) { | |
error = exception; | |
publishError(); | |
} | |
} | |
} | |
return result; | |
} | |
}; | |
this.futureTask = new FutureTask<T>(callableWork) { | |
@Override | |
protected void done() { | |
try { | |
get(); | |
} catch (Exception exception) { | |
if (!wasCancelled.get()) { | |
if (wasErrored.compareAndSet(false, true)) { | |
error = (exception instanceof ExecutionException) ? | |
exception.getCause() : exception; | |
publishError(); | |
} | |
} | |
} | |
} | |
}; | |
} | |
private void publishPending() { | |
this.handler.obtainMessage(MESSAGE_PENDING, this).sendToTarget(); | |
} | |
private void publishProcessing() { | |
this.handler.obtainMessage(MESSAGE_PROCESSING, this).sendToTarget(); | |
} | |
private void publishResult() { | |
this.handler.obtainMessage(MESSAGE_RESULT, this).sendToTarget(); | |
} | |
private void publishError() { | |
this.handler.obtainMessage(MESSAGE_ERROR, this).sendToTarget(); | |
} | |
private void publishCancelled() { | |
if (this.wasCancelled.compareAndSet(false, true)) { | |
this.handler.obtainMessage(MESSAGE_CANCELLED, this).sendToTarget(); | |
} | |
} | |
public void execute() { | |
if (this.wasInvoked.get()) throw new IllegalStateException("BackgroundWork was already executed"); | |
if (this.wasCancelled.get()) throw new IllegalStateException("BackgroundWork was cancelled"); | |
publishPending(); | |
THREAD_POOL_EXECUTOR.execute(this.futureTask); | |
} | |
public final boolean cancel() { | |
publishCancelled(); | |
return this.futureTask.cancel(true); | |
} | |
@WorkerThread | |
protected abstract T doBackgroundWork(); | |
@MainThread | |
protected abstract void onResult(@NotNull T result); | |
@MainThread | |
protected abstract void onError(@NotNull Throwable error); | |
public enum State { | |
PENDING, | |
PROCESSING, | |
RESULT, | |
ERROR, | |
CANCELLED | |
} | |
@MainThread | |
protected abstract void onStateChange(@NotNull State workState); | |
private static final int MESSAGE_PENDING = 0x1; | |
private static final int MESSAGE_PROCESSING = 0x2; | |
private static final int MESSAGE_RESULT = 0x3; | |
private static final int MESSAGE_ERROR = 0x4; | |
private static final int MESSAGE_CANCELLED = 0x5; | |
private static class MainThreadHandler extends Handler { | |
MainThreadHandler() { | |
super(Looper.getMainLooper()); | |
} | |
@Override | |
public void handleMessage(Message msg) { | |
BackgroundWork<?> work = (BackgroundWork<?>) msg.obj; | |
switch (msg.what) { | |
case MESSAGE_PENDING: | |
work.notifyPending(); | |
break; | |
case MESSAGE_PROCESSING: | |
work.notifyProcessing(); | |
break; | |
case MESSAGE_RESULT: | |
work.notifyResult(); | |
break; | |
case MESSAGE_ERROR: | |
work.notifyError(); | |
break; | |
case MESSAGE_CANCELLED: | |
work.notifyCancelled(); | |
break; | |
} | |
} | |
} | |
private void notifyPending() { | |
onStateChange(State.PENDING); | |
} | |
private void notifyProcessing() { | |
onStateChange(State.PROCESSING); | |
} | |
private void notifyResult() { | |
if (this.result != null) { | |
onResult(this.result); | |
} | |
onStateChange(State.RESULT); | |
} | |
private void notifyError() { | |
if (this.error != null) { | |
onError(this.error); | |
} | |
onStateChange(State.ERROR); | |
} | |
private void notifyCancelled() { | |
onStateChange(State.CANCELLED); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BackgroundWorkExample { | |
fun doSomethingCool() { | |
val work = object : BackgroundWork<String>() { | |
override fun doBackgroundWork(): String { | |
Thread.sleep(10_000) | |
return "Hello, world!" | |
} | |
override fun onResult(result: String) { | |
} | |
override fun onError(error: Throwable) { | |
} | |
override fun onStateChange(workState: State) { | |
when (workState) { | |
State.PENDING, | |
State.PROCESSING, | |
State.RESULT, | |
State.ERROR, | |
State.CANCELLED -> { | |
} | |
} | |
} | |
} | |
// do the work | |
work.execute() | |
// if you change your mind | |
work.cancel() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment