@NonNull @Override public Disposable schedule( @NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit){ ... return scheduleActual(action, delayTime, unit, null); }
@NonNull public ScheduledRunnable scheduleActual( final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent){ Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
...
Future<?> f; try { if (delayTime <= 0) { // 直接提交 f = executor.submit((Callable<Object>) sr); } else { // 延迟调度 f = executor.schedule((Callable<Object>) sr, delayTime, unit); }
While this class inherits from ThreadPoolExecutor, a few of the inherited tuning methods are not useful for it. In particular, because it acts as a fixed-sized pool using corePoolSize threads and an unbounded queue, adjustments to maximumPoolSize have no useful effect.
publicfinalclassSingleSchedulerextendsScheduler{ final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<ScheduledExecutorService>();
...
@NonNull @Override public Worker createWorker(){ returnnew ScheduledWorker(executor.get()); }
@NonNull @Override public Disposable scheduleDirect( @NonNull Runnable run, long delay, TimeUnit unit){ ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run)); try { Future<?> f; if (delay <= 0L) { // 直接提交 f = executor.get().submit(task); } else { // 延时调度 f = executor.get().schedule(task, delay, unit); }
// Async will only be true when the API is available to call. @Override @SuppressLint("NewApi") public Disposable schedule(Runnable run, long delay, TimeUnit unit){ ...
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); Message message = Message.obtain(handler, scheduled); // Used as token for batch disposal of this worker's runnables. message.obj = this;