Created
May 18, 2021 10:39
-
-
Save clydebarrow/0cdd53754096942f8b242f9b0e64b2d4 to your computer and use it in GitHub Desktop.
Main thread scheduler for RxJava on iOS. Used with RoboVM.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package reactivex.ios.schedulers | |
import io.reactivex.rxjava3.core.Scheduler | |
import org.robovm.apple.foundation.NSOperationQueue | |
import reactivex.ios.plugins.RxIosPlugins | |
/** | |
* RoboVM iOS-specific Schedulers. | |
*/ | |
class IosSchedulers private constructor() { | |
private object MainHolder { | |
val DEFAULT: Scheduler = QueueScheduler(NSOperationQueue.getMainQueue()) | |
} | |
companion object { | |
private val MAIN_THREAD: Scheduler = RxIosPlugins.initMainThreadScheduler( | |
{ MainHolder.DEFAULT }) | |
/** | |
* A [Scheduler] which executes actions on the Android main thread. | |
*/ | |
fun mainThread(): Scheduler { | |
return RxIosPlugins.onMainThreadScheduler(MAIN_THREAD) | |
} | |
/** | |
* A [Scheduler] which executes actions on `looper`. | |
*/ | |
fun from(queue: NSOperationQueue?): Scheduler { | |
if (queue == null) throw NullPointerException("queue == null") | |
return QueueScheduler(queue) | |
} | |
} | |
init { | |
throw AssertionError("No instances.") | |
} | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package reactivex.ios.schedulers | |
import io.reactivex.rxjava3.core.Scheduler | |
import io.reactivex.rxjava3.disposables.Disposable | |
import io.reactivex.rxjava3.plugins.RxJavaPlugins | |
import org.robovm.apple.dispatch.DispatchQueue | |
import org.robovm.apple.foundation.NSBlockOperation | |
import org.robovm.apple.foundation.NSOperationQueue | |
import java.util.concurrent.TimeUnit | |
internal class QueueScheduler(private val operationQueue: NSOperationQueue) : Scheduler() { | |
private val dispatchQueue: DispatchQueue = DispatchQueue.getMainQueue() | |
override fun scheduleDirect(run: Runnable, delay: Long, unit: TimeUnit): Disposable { | |
val scheduled = ScheduledRunnable(operationQueue, RxJavaPlugins.onSchedule(run)) | |
dispatchQueue.after(delay, unit, scheduled) | |
return scheduled | |
} | |
override fun createWorker(): Worker { | |
return HandlerWorker(operationQueue, dispatchQueue) | |
} | |
private class HandlerWorker internal constructor(private val queue: NSOperationQueue, private val dispatchQueue: DispatchQueue) : Worker() { | |
@Volatile | |
private var disposed = false | |
override fun schedule(run: Runnable, delay: Long, unit: TimeUnit): Disposable { | |
if (disposed) return Disposable.disposed() | |
val scheduled = ScheduledRunnable(queue, RxJavaPlugins.onSchedule(run)) | |
dispatchQueue.after(delay, unit, scheduled) | |
// Re-check disposed state for removing in case we were racing a call to dispose(). | |
if (disposed) { | |
scheduled.dispose() | |
return Disposable.disposed() | |
} | |
return scheduled | |
} | |
override fun dispose() { | |
disposed = true | |
} | |
override fun isDisposed(): Boolean { | |
return disposed | |
} | |
} | |
private class ScheduledRunnable internal constructor(private val queue: NSOperationQueue, delegate: Runnable?) : Runnable, Disposable { | |
private val blockOperation: NSBlockOperation = NSBlockOperation(delegate) | |
@Volatile | |
private var disposed = false | |
override fun run() { | |
if (!disposed) try { | |
queue.addOperation(blockOperation) | |
} catch (t: Throwable) { | |
val ie = IllegalStateException("Fatal Exception thrown on Scheduler.", t) | |
RxJavaPlugins.onError(ie) | |
val thread = Thread.currentThread() | |
thread.uncaughtExceptionHandler.uncaughtException(thread, ie) | |
} | |
} | |
override fun dispose() { | |
disposed = true | |
blockOperation.cancel() | |
} | |
override fun isDisposed(): Boolean { | |
return disposed | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package reactivex.ios.plugins; | |
import io.reactivex.rxjava3.core.Scheduler; | |
import io.reactivex.rxjava3.exceptions.Exceptions; | |
import io.reactivex.rxjava3.functions.Function; | |
import java.util.concurrent.Callable; | |
/** | |
* Utility class to inject handlers to certain standard RxAndroid operations. | |
*/ | |
public final class RxIosPlugins { | |
private static volatile Function<Callable<Scheduler>, Scheduler> onInitMainThreadHandler; | |
private static volatile Function<Scheduler, Scheduler> onMainThreadHandler; | |
public static void setInitMainThreadSchedulerHandler(Function<Callable<Scheduler>, Scheduler> handler) { | |
onInitMainThreadHandler = handler; | |
} | |
public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) { | |
if (scheduler == null) { | |
throw new NullPointerException("scheduler == null"); | |
} | |
Function<Callable<Scheduler>, Scheduler> f = onInitMainThreadHandler; | |
if (f == null) { | |
return callRequireNonNull(scheduler); | |
} | |
return applyRequireNonNull(f, scheduler); | |
} | |
public static void setMainThreadSchedulerHandler(Function<Scheduler, Scheduler> handler) { | |
onMainThreadHandler = handler; | |
} | |
public static Scheduler onMainThreadScheduler(Scheduler scheduler) { | |
if (scheduler == null) { | |
throw new NullPointerException("scheduler == null"); | |
} | |
Function<Scheduler, Scheduler> f = onMainThreadHandler; | |
if (f == null) { | |
return scheduler; | |
} | |
return apply(f, scheduler); | |
} | |
/** | |
* Returns the current hook function. | |
* @return the hook function, may be null | |
*/ | |
public static Function<Callable<Scheduler>, Scheduler> getInitMainThreadSchedulerHandler() { | |
return onInitMainThreadHandler; | |
} | |
/** | |
* Returns the current hook function. | |
* @return the hook function, may be null | |
*/ | |
public static Function<Scheduler, Scheduler> getOnMainThreadSchedulerHandler() { | |
return onMainThreadHandler; | |
} | |
/** | |
* Removes all handlers and resets the default behavior. | |
*/ | |
public static void reset() { | |
setInitMainThreadSchedulerHandler(null); | |
setMainThreadSchedulerHandler(null); | |
} | |
static Scheduler callRequireNonNull(Callable<Scheduler> s) { | |
try { | |
Scheduler scheduler = s.call(); | |
if (scheduler == null) { | |
throw new NullPointerException("Scheduler Callable returned null"); | |
} | |
return scheduler; | |
} catch (Throwable ex) { | |
throw Exceptions.propagate(ex); | |
} | |
} | |
static Scheduler applyRequireNonNull(Function<Callable<Scheduler>, Scheduler> f, Callable<Scheduler> s) { | |
Scheduler scheduler = apply(f,s); | |
if (scheduler == null) { | |
throw new NullPointerException("Scheduler Callable returned null"); | |
} | |
return scheduler; | |
} | |
static <T, R> R apply(Function<T, R> f, T t) { | |
try { | |
return f.apply(t); | |
} catch (Throwable ex) { | |
throw Exceptions.propagate(ex); | |
} | |
} | |
private RxIosPlugins() { | |
throw new AssertionError("No instances."); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment