Created
January 25, 2021 11:09
-
-
Save SergejIsbrecht/f3ad98dd1a07e15cd898acb73210d3bb to your computer and use it in GitHub Desktop.
RxJava
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.reactivex.rxjava3.core; | |
import io.reactivex.rxjava3.annotations.NonNull; | |
import io.reactivex.rxjava3.disposables.Disposable; | |
import io.reactivex.rxjava3.plugins.RxJavaPlugins; | |
import io.reactivex.rxjava3.schedulers.Schedulers; | |
import java.util.Objects; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.TimeUnit; | |
class SchedulersTest { | |
public void init() { | |
RxJavaPlugins.setInitSingleSchedulerHandler( | |
schedulerCallable -> Schedulers.from(Executors.newSingleThreadScheduledExecutor())); | |
RxJavaPlugins.lockdown(); | |
// `ScheduledThreadPoolExecutor` -> uses `System.nanoTime() | |
// public long getDelay(TimeUnit unit) { | |
// return unit.convert(time - System.nanoTime(), NANOSECONDS); | |
// } | |
// Scheduler default impl uses `System.currentTimeMillis()` | |
// this could have different behavior with delayed tasks | |
// use-case: System.currentTimeMillis() | |
// update some state every minute. When suspended for 10 minutes and awaken, timer fires, | |
// because of jump | |
// some action is invoked to update a state. When time does not jump, the state might not be | |
// updated for | |
// delta (0 < x < 1) min. | |
// use-case: System.nanoTime() | |
// watchdog which checks some | |
// 1. define different single scheduler | |
// 2. tell every developer that `ScheduledExecutorService` should be used for periodic tasks | |
// Problem: Android specific time which is kind of monotonic but is suspension aware | |
// use-case: not depending on external time changes (System.currentTimeMillis) only forward | |
// monotonic with jumps due to adjustment | |
// ideal for periodic timers, which fire after suspension, when system has been asleep for set | |
// timer-period | |
// what would I do? Well, | |
// 1. use ScheduledExecutorService with `System.nanoTime()` in conjunction with | |
// `Schedulers.from` and have a not adjusted time on Linux_x64_86 | |
// 2. implement own Scheduler (e.g. Single-Threaded-Scheduler) using time-source x | |
// 3. delegation + some copy-pasting + now-overwrite | |
// Problem | |
// different time-sources across: RxJava uses System.currentTimeMillis | | |
// ScheduledExecutorService uses System.nanoTime | |
// lets say I want to schedule tasks periodically with `Executors.newSingleThreadExecutor()` but | |
// instead of `System.currentTimeMillis()` use `SystemClock.elapsedRealtimeNanos()` | |
// Even bigger Problem: | |
// even `Single` uses two different times: either `System.nanoTime` by | |
// `ScheduledExecutorService` and `System.currentTimeMillis`, when using a `worker`. Which is | |
// probably very bad for 'suspension', because `Scheduler.schedulePeriodicallyDirect` will | |
// (probably) not fire after suspension and `Scheduler.Worker.schedulePeriodically` will, | |
// because based upon `System.currentTimeMillis()`. This could lead to very unfortunate bugs. | |
// Problem 2 | |
// `Executors.newSingleThreadExecutor()` will use the default impl, when used in conjunction | |
// with `Schedulers.from`, which in turn uses `System.currentTimeMillis()` | |
RxJavaPlugins.setSingleSchedulerHandler(TimeSchedulerWrapper::new); | |
} | |
static final class TimeSchedulerWrapper extends Scheduler { | |
private final Scheduler scheduler; | |
TimeSchedulerWrapper(Scheduler scheduler) { | |
this.scheduler = Objects.requireNonNull(scheduler, "scheduler"); | |
} | |
@Override | |
public long now(@NonNull TimeUnit unit) { | |
return System.nanoTime(); | |
} | |
@Override | |
public @NonNull Worker createWorker() { | |
return new WorkerWrapper(scheduler.createWorker()); | |
} | |
@Override | |
public void start() { | |
scheduler.start(); | |
} | |
@Override | |
public void shutdown() { | |
scheduler.shutdown(); | |
} | |
} | |
static final class WorkerWrapper extends Scheduler.Worker { | |
private final Scheduler.Worker worker; | |
WorkerWrapper(Scheduler.Worker worker) { | |
this.worker = Objects.requireNonNull(worker, "worker"); | |
} | |
@Override | |
public long now(@NonNull TimeUnit unit) { | |
return System.nanoTime(); | |
} | |
@Override | |
public @NonNull Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { | |
return worker.schedule(run, delay, unit); | |
} | |
@Override | |
public void dispose() { | |
worker.dispose(); | |
} | |
@Override | |
public boolean isDisposed() { | |
return worker.isDisposed(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment