RxJava 链式调用原理

RxJava 采用了类似 Stream API 的链式调用设计,提供了 filter(), map(), observeOn() 等大家经常使用的操作符。与 Builder 模式对调用方法的顺序没有要求不同,RxJava 的操作符调用需要保持顺序关系。在本篇文章中,我们来解析一下 RxJava 是如何实现这些链式调用的。

首先列举一个常见的 RxJava 场景:

Observable.create(...)
.filter(...)
.map(...)
.observeOn(...)
.subscribe(...)

链式调用从 subscribe() 开始被触发,我们来看一下对应的源码:

public abstract class Observable<T> implements ObservableSource<T> {
...

public final void subscribe(Observer<? super T> observer) {
...

try {
...

subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
...
}
}

protected abstract void subscribeActual(Observer<? super T> observer);
}

可以看到 subscribe() 实际调用的是 subscribeActual() 的具体实现,而 Observable 的子类有 ObservableFilter, ObservableMap, ObservableObserveOn 等。聪明的你肯定想到了,这些子类显然与对应的操作符有关。以 filter() 为例:

public final Observable<T> filter(Predicate<? super T> predicate) {
...
return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
}

可以看到,每调用一次 filter() 都会将上层的 Observable 包装成一个新的 ObservableFilter, 对应的我们最初的例子的调用栈实际上是这样的:

ObservableObserveOn.subscribeActual() { // 最后被执行
ObservableMap.subscribeActual() { // 第三个被执行
ObservableFilter.subscribeActual() { // 第二个被执行
ObservableCreate.subscribeActual() { // 嵌套最深的最先被执行
// DO SOMETHING
}
}
}
}

至于每个 Observable 都是如何实现的,这里就不展开了,其中涉及 Java 的静态代理,感兴趣的同学可以参见这个回答 Java 动态代理作用是什么?