Created
October 20, 2023 19:36
-
-
Save arindamxd/d72746493e384d54852b4460ce317d13 to your computer and use it in GitHub Desktop.
TaskScheduler: A backgound task scheduler with a delay and override feature.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.arindam.scheduler | |
import android.os.Handler | |
import android.os.Looper | |
import android.util.Log | |
import java.util.concurrent.CancellationException | |
import java.util.concurrent.CountDownLatch | |
import java.util.concurrent.ExecutionException | |
import java.util.concurrent.Executor | |
import java.util.concurrent.Executors | |
import java.util.concurrent.Future | |
import java.util.concurrent.ScheduledFuture | |
import java.util.concurrent.ScheduledThreadPoolExecutor | |
import java.util.concurrent.ThreadFactory | |
import java.util.concurrent.TimeUnit | |
/** | |
* A helper class that allows to schedule/queue Runnables on a single threaded background queue. | |
* | |
* Created by Arindam Karmakar on 20/10/23. | |
*/ | |
object TaskScheduler { | |
private val tag = TaskScheduler::class.java.name | |
/** | |
* Represents a Task scheduled to be run in the future on an [TaskScheduler]. | |
* It is created via createAndScheduleDelayedTask(). | |
* Supports cancellation (via cancel()) and early execution (via skipDelay()). | |
*/ | |
class DelayedTask(private val task: Runnable) { | |
// The ScheduledFuture returned by executor.schedule(). It is set to null after the task has | |
// been run or canceled. | |
private var scheduledFuture: ScheduledFuture<*>? = null | |
/** | |
* Schedules the DelayedTask. This is called immediately after construction by | |
* createAndScheduleDelayedTask(). | |
*/ | |
internal fun start(delayMs: Long) { | |
scheduledFuture = executor.schedule(this::handleDelayElapsed, delayMs) | |
} | |
/** | |
* Cancels the task if it hasn't already been executed or canceled. | |
* | |
* As long as the task has not yet been run, calling cancel() (from a task already running on | |
* the [TaskScheduler] provides a guarantee that the task will not be run. | |
*/ | |
internal fun cancel() { | |
if (scheduledFuture != null) { | |
// NOTE: We don't rely on this cancel() succeeding since handleDelayElapsed() will become | |
// a no-op anyway (since markDone() sets scheduledFuture to null). | |
scheduledFuture?.cancel( /* mayInterruptIfRunning = */false) | |
markDone() | |
} | |
} | |
private fun handleDelayElapsed() { | |
verifyIsCurrentThread() | |
if (scheduledFuture != null) { | |
markDone() | |
task.run() | |
} | |
} | |
/** Marks this delayed task as done, notifying the [TaskScheduler] that it should be removed. */ | |
private fun markDone() { | |
if (scheduledFuture == null) Log.e(tag, "Caller should have verified scheduledFuture is non-null.") | |
scheduledFuture = null | |
removeDelayedTask(this) | |
} | |
} | |
/** | |
* A wrapper around a [ScheduledThreadPoolExecutor] class that provides: | |
* | |
* 1. Synchronized task scheduling. This is different from function 3, which is about task | |
* execution in a single thread. | |
* 1. Ability to do soft-shutdown: only critical tasks related to shutting [TaskScheduler] down | |
* can be executed once the shutdown process initiated. | |
* 1. Single threaded execution service, no concurrent execution among the `Runnable`s | |
* scheduled in this Executor. | |
*/ | |
private class SynchronizedShutdownAwareExecutor : Executor { | |
/** | |
* The single threaded executor that is backing this Executor. This is also the executor used | |
* when some tasks explicitly request to run after shutdown has been initiated. | |
*/ | |
private val internalExecutor: ScheduledThreadPoolExecutor | |
/** Synchronized access to isShuttingDown */ | |
/** Whether the shutdown process has initiated, once it is started, it is not revertable. */ | |
@get:Synchronized | |
private val isShuttingDown: Boolean | |
/** | |
* The single thread that will be used by the executor. This is created early and managed | |
* directly so that it's possible later to make assertions about executing on the correct | |
* thread. | |
*/ | |
val thread: Thread | |
/** A ThreadFactory for a single, pre-created thread. */ | |
private inner class DelayedStartFactory : Runnable, ThreadFactory { | |
private val latch = CountDownLatch(1) | |
private var delegate: Runnable? = null | |
override fun run() { | |
try { | |
latch.await() | |
} catch (e: InterruptedException) { | |
Thread.currentThread().interrupt() | |
} | |
delegate?.run() | |
} | |
override fun newThread(runnable: Runnable): Thread { | |
if (delegate != null) Log.e(tag, "Only one thread may be created in an $tag.") | |
delegate = runnable | |
latch.countDown() | |
return thread | |
} | |
} | |
init { | |
val threadFactory = DelayedStartFactory() | |
thread = Executors.defaultThreadFactory().newThread(threadFactory) | |
thread.name = "TokenRefreshWorker" | |
thread.isDaemon = true | |
thread.uncaughtExceptionHandler = Thread.UncaughtExceptionHandler { _, throwable -> | |
panic(throwable) | |
} | |
internalExecutor = object : ScheduledThreadPoolExecutor(1, threadFactory) { | |
override fun afterExecute(r: Runnable?, t: Throwable?) { | |
var throwable: Throwable? = t | |
super.afterExecute(r, t) | |
if (throwable == null && r is Future<*>) { | |
val future = r as Future<*> | |
try { | |
// Not all Futures will be done, e.g. when used with scheduledAtFixedRate | |
if (future.isDone) future.get() | |
} catch (ce: CancellationException) { | |
// Cancellation exceptions are okay, we expect them to happen sometimes | |
} catch (ee: ExecutionException) { | |
throwable = ee.cause | |
} catch (e: InterruptedException) { | |
Thread.currentThread().interrupt() | |
} | |
} | |
throwable?.let { panic(it) } | |
} | |
} | |
// Core threads don't time out, this only takes effect when we drop the | |
// number of required core threads | |
internalExecutor.setKeepAliveTime(3, TimeUnit.SECONDS) | |
isShuttingDown = false | |
} | |
/** | |
* Check if shutdown is initiated before scheduling. If it is initiated, the command will not be | |
* executed. | |
*/ | |
@Synchronized | |
override fun execute(command: Runnable) { | |
if (!isShuttingDown) internalExecutor.execute(command) | |
} | |
/** | |
* Wraps [ScheduledThreadPoolExecutor.schedule] and provides | |
* shutdown state check: the command will not be scheduled if the shutdown has been initiated. | |
*/ | |
@Synchronized | |
fun schedule(command: Runnable, delay: Long): ScheduledFuture<*>? { | |
return if (!isShuttingDown) internalExecutor.schedule(command, delay, TimeUnit.MILLISECONDS) else null | |
} | |
/** Wraps around [ScheduledThreadPoolExecutor.shutdownNow]. */ | |
fun shutdownNow(): MutableList<Runnable> = internalExecutor.shutdownNow() | |
/** Wraps around [ScheduledThreadPoolExecutor.setCorePoolSize]. */ | |
fun setCorePoolSize(size: Int) { | |
internalExecutor.corePoolSize = size | |
} | |
} | |
/** The executor backing this [TaskScheduler]. */ | |
private val executor: SynchronizedShutdownAwareExecutor = SynchronizedShutdownAwareExecutor() | |
// Tasks scheduled to be queued in the future. Tasks are automatically removed after they are run | |
// or canceled. | |
// NOTE: We disallow duplicates currently, so this could be a Set<> which might have better | |
// theoretical removal speed, except this list will always be small so ArrayList is fine. | |
private val delayedTasks: ArrayList<DelayedTask> = ArrayList() | |
/** | |
* Schedule a task after the specified delay. | |
* The returned DelayedTask can be used to cancel the task prior to its running. | |
* | |
* @param override Whether to replace existing scheduled tasks. | |
* @param delayMs The delay after which the task will run. | |
* @param task The task to run | |
* @return A DelayedTask instance that can be used for cancellation. | |
*/ | |
fun scheduleTask(override: Boolean = true, delayMs: Long, task: Runnable): DelayedTask { | |
if (override && isTaskScheduled()) cancelTask() | |
val delayedTask = scheduleDelayedTask(delayMs, task) | |
delayedTasks.add(delayedTask) | |
return delayedTask | |
} | |
/** Determines if a delayed task with a particular timerId exists. */ | |
internal fun isTaskScheduled(): Boolean = delayedTasks.isNotEmpty() | |
/** Cancels the running tasks. */ | |
internal fun cancelTask(): Boolean { | |
if (delayedTasks.isEmpty()) return false | |
for (task in delayedTasks) { | |
task.cancel() | |
delayedTasks.remove(task) | |
} | |
return true | |
} | |
/** | |
* Shuts down the [TaskScheduler] and releases resources after which no progress will ever be made | |
* again. | |
*/ | |
internal fun shutdown() { | |
// Will cause the executor to de-reference all threads, the best we can do | |
executor.setCorePoolSize(0) | |
} | |
/** | |
* Creates and returns a DelayedTask that has been scheduled to be executed on the provided queue | |
* after the provided delayMs. | |
* | |
* @param delayMs The delay (ms) before the operation should be scheduled. | |
* @param task The task to run. | |
*/ | |
private fun scheduleDelayedTask( | |
delayMs: Long, | |
task: Runnable | |
): DelayedTask = DelayedTask(task).apply { start(delayMs) } | |
/** Called by DelayedTask to remove itself from our list of pending delayed tasks. */ | |
private fun removeDelayedTask(task: DelayedTask) { | |
val found = delayedTasks.remove(task) | |
if (!found) Log.e(tag, "Delayed task not found.") | |
} | |
/** Verifies that the current thread is the managed [TaskScheduler] thread. */ | |
private fun verifyIsCurrentThread() { | |
val current = Thread.currentThread() | |
if (executor.thread !== current) throw RuntimeException(String.format( | |
"We are running on the wrong thread. Expected to be on the %s thread %s/%d but was %s/%d", | |
tag, executor.thread.name, executor.thread.id, current.name, current.id | |
)) | |
} | |
/** | |
* Immediately stops running any scheduled tasks and causes a "panic" (through crashing the app). | |
* Should only be used for unrecoverable exceptions. | |
* | |
* @param t The Throwable that is caused the panic. | |
*/ | |
// TODO: Check If this is required, if not, handle it properly | |
private fun panic(t: Throwable?) { | |
executor.shutdownNow() | |
val handler = Handler(Looper.getMainLooper()) | |
handler.post { | |
if (t is OutOfMemoryError) { | |
// OOMs can happen if developers try to load too much data at once. Instead of treating | |
// this as an internal error, give a hint that this might be due to excessive queries | |
// in TaskScheduler. | |
val error = OutOfMemoryError("$tag ran out of memory. Check your queries to make sure they are not loading an excessive amount of data.") | |
error.initCause(t) | |
throw error | |
} else { | |
throw RuntimeException("Internal error in $tag.", t) | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment