RxJava在Android开发中主要用来管理多线程任务的调度,因为其简洁的链式调用以及方便的线程切换而被广泛开发者喜爱。虽然RxJava使用起来非常简洁,但是并不简单,想要弄懂RxJava内部是如何工作的也并非一件易事。
在RxJava中有很多操作符,有用于创建Observable的、用于变换数据流的、切换执行线程的等等,有一些操作符“长得非常像”对于刚开始学习RxJava的人来说,非常容易弄错。下面我们看看RxJava的使用示例
参考:RxJava操作符
注释这段代码中使用的操作符是我们平时写RxJava经常用到的,当我们调用上面代码时,经过了以下几个阶段
创建数据流
-> 订阅数据流
->数据流转
->取消订阅
上面提到了RxJava中有很多操作符,有些操作符可能在某一阶段才产生作用,在前面的阶段仅仅是保存起来备用。
首先,调用Observable.just
方法,创建一个数据为Hey
的数据源,通过源码可以看出,实际上是创建了一个ObservableJust
对象。
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
然后是调用subscribeOn
方法设置执行线程,通过源码发现,实际上也只是它创建了一个ObservableSubscribeOn
对象。
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
//"this" it's top observable
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
并且ObservableSubscribeOn
对象持有上面ObservableJust
对象的引用。
在subscribe
之前的操作符调用也和这里一样,都会创建一个具体的Observable
对象,该对象会持有上一个Observable
对象的引用,最后会形成一个Observable
的链表。
注意这里仅仅是创建对象,并没有实际去执行流转数据和切换线程。
订阅过程是从subscribe
方法开始,自下而上执行,每次操作符都是订阅它的上流,比如subscribe
操作符订阅的是flatMap
,然后flatMap
订阅的是doOnSubscribe
,在订阅的过程中,有些操作符中的代码块会被执行,而有些在订阅阶段不会执行。
我们分析一下上面那段RxJava代码在订阅阶段的执行情况。
subscribe{...}
订阅上流,执行线程:main。flatMap{...}
订阅上流,执行线程:main。doOnSubscribe{...}
订阅上流,并且执行其中的Lambda表达式,执行线程:main。observeOn(Schedulers.single())
订阅上流,执行线程:main,注意该操作符是设置数据流转阶段的执行线程,目前是订阅数据流阶段,所以这里并未改变线程。subscribeOn(Schedulers.computation())
订阅上流,执行线程:main,并修改执行线程为computation。map{...}
订阅上流,执行线程:computation。subscribeOn(Schedulers.io())
订阅上流,执行线程:computation,并修改执行线程为io。从上面分析可以看出,我们可以通过subscribeOn(...)
来改变订阅数据流时的线程环境,并且可以改变多次,但是如果每次切换的是同一个线程环境,则前面的两次切换将被忽略,比如:
Observable.just("Hey")
.doOnSubscribe { doAction() }
.subscribeOn(Schedulers.io()) //忽略
.doOnSubscribe { doAction() }
.subscribeOn(Schedulers.io()) //忽略
.doOnSubscribe { doAction() }
.subscribeOn(Schedulers.io()) //有效
.subscribe { }
主要影响包括订阅操作的执行线程和doOnSubscribe
的执行线程以及后面数据流转阶段的执行线程。
当订阅阶段执行到Observable.just("Hey")
时,发现它没有上流,所以从它开始,从上往下进行数据流转。在数据流订阅阶段最后执行的线程是io,所以流转的开始线程也是io,具体执行情况如下:
Observsble.just("Hey")
将数据传到下流,执行线程:io。subscribeOn(Schedulers.io())
透传数据给下流,执行线程:io,注意该操作符是设置数据流订阅阶段的执行线程,目前是数据流转阶段,所以这里并未改变线程。map{...}
执行其中的Lambda表达式,并把表达式结果传到下流,执行线程:io。subscribeOn(Schedulers.computation())
透传数据给下流,执行线程:io。observeOn(Schedulers.single())
,修改执行线程为single,并将数据传到下流。doOnSubscribe
透传数据给下流,执行线程:single。flatMap{...}
执行其中的Lambda表达式,并把表达式结果传到下流:执行线程:single。需要注意的是,这里面的RxJava代码同样会经历上面的几个阶段
创建数据流
-> 订阅数据流
->数据流转
但是不会经历取消订阅阶段,因为这里还需要把数据传到下流,并且在这里最终设置的执行线程会影响外部的执行线程。运行上面代码最后线程被切换到computation了,但是代码中却没有这个逻辑,在查看Observable.timer(...)
源码发现,它内部会把线程切换到computation。
public static Observable<Long> timer(long delay, @NonNull TimeUnit unit) {
return timer(delay, unit, Schedulers.computation());
}
8.subscribe{...}
执行其中的Lambda表达式,执行线程:computation。
RxJava方法链调用经历了以下三个阶段,首先从上往下创建数据流,再从下往上订阅数据流,最后再从上往下流转数据,需要注意的是subscribeOn(...)
是改变数据流订阅阶段,其以上流订阅时执行的线程,而observeOn(...)
是改变数据流转阶段,其以下流的执行线程。