实用 AI

可在线运行 AI 集合,涵盖 AI 文案生成、写作辅助、AI 绘图与照片修复、AI 配音、字幕生成、语音转录以及 AI 视频创作和数字人等多种 AI 服务

查看详情

RxJava的方法链调用是如何工作的?

Android面试技术要点汇总
2021-05-17 14:29 · 阅读时长8分钟
小课

RxJava在Android开发中主要用来管理多线程任务的调度,因为其简洁的链式调用以及方便的线程切换而被广泛开发者喜爱。虽然RxJava使用起来非常简洁,但是并不简单,想要弄懂RxJava内部是如何工作的也并非一件易事。

在RxJava中有很多操作符,有用于创建Observable的、用于变换数据流的、切换执行线程的等等,有一些操作符“长得非常像”对于刚开始学习RxJava的人来说,非常容易弄错。下面我们看看RxJava的使用示例

参考:RxJava操作符

注释
加载中...

这段代码中使用的操作符是我们平时写RxJava经常用到的,当我们调用上面代码时,经过了以下几个阶段

创建数据流 -> 订阅数据流->数据流转->取消订阅

上面提到了RxJava中有很多操作符,有些操作符可能在某一阶段才产生作用,在前面的阶段仅仅是保存起来备用。

一、创建数据源

首先,调用Observable.just方法,创建一个数据为Hey的数据源,通过源码可以看出,实际上是创建了一个ObservableJust对象。

public static <T> Observable<T> just(T item) {
   ObjectHelper.requireNonNull(item, "item is null");
   return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}

然后是调用subscribeOn方法设置执行线程,通过源码发现,实际上也只是它创建了一个ObservableSubscribeOn对象。

public final Observable<T> subscribeOn(Scheduler scheduler) {
   ObjectHelper.requireNonNull(scheduler, "scheduler is null");
   //"this" it's top observable
   return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

并且ObservableSubscribeOn对象持有上面ObservableJust对象的引用。

subscribe之前的操作符调用也和这里一样,都会创建一个具体的Observable对象,该对象会持有上一个Observable对象的引用,最后会形成一个Observable的链表。

注意这里仅仅是创建对象,并没有实际去执行流转数据和切换线程。
二、订阅数据流

订阅过程是从subscribe方法开始,自下而上执行,每次操作符都是订阅它的上流,比如subscribe操作符订阅的是flatMap,然后flatMap订阅的是doOnSubscribe,在订阅的过程中,有些操作符中的代码块会被执行,而有些在订阅阶段不会执行。

我们分析一下上面那段RxJava代码在订阅阶段的执行情况。

  1. subscribe{...}订阅上流,执行线程:main。
  2. flatMap{...}订阅上流,执行线程:main。
  3. doOnSubscribe{...}订阅上流,并且执行其中的Lambda表达式,执行线程:main。
  4. observeOn(Schedulers.single())订阅上流,执行线程:main,注意该操作符是设置数据流转阶段的执行线程,目前是订阅数据流阶段,所以这里并未改变线程。
  5. subscribeOn(Schedulers.computation())订阅上流,执行线程:main,并修改执行线程为computation。
  6. map{...}订阅上流,执行线程:computation。
  7. subscribeOn(Schedulers.io())订阅上流,执行线程:computation,并修改执行线程为io。

从上面分析可以看出,我们可以通过subscribeOn(...)来改变订阅数据流时的线程环境,并且可以改变多次,但是如果每次切换的是同一个线程环境,则前面的两次切换将被忽略,比如:

Observable.just("Hey")
     .doOnSubscribe { doAction() }
     .subscribeOn(Schedulers.io()) //忽略
     .doOnSubscribe { doAction() }
     .subscribeOn(Schedulers.io()) //忽略
     .doOnSubscribe { doAction() }
     .subscribeOn(Schedulers.io()) //有效
     .subscribe { }

主要影响包括订阅操作的执行线程和doOnSubscribe的执行线程以及后面数据流转阶段的执行线程。

三、数据流转阶段

当订阅阶段执行到Observable.just("Hey")时,发现它没有上流,所以从它开始,从上往下进行数据流转。在数据流订阅阶段最后执行的线程是io,所以流转的开始线程也是io,具体执行情况如下:

  1. Observsble.just("Hey")将数据传到下流,执行线程:io。
  2. subscribeOn(Schedulers.io())透传数据给下流,执行线程:io,注意该操作符是设置数据流订阅阶段的执行线程,目前是数据流转阶段,所以这里并未改变线程。
  3. map{...}执行其中的Lambda表达式,并把表达式结果传到下流,执行线程:io。
  4. subscribeOn(Schedulers.computation())透传数据给下流,执行线程:io。
  5. observeOn(Schedulers.single()),修改执行线程为single,并将数据传到下流。
  6. doOnSubscribe透传数据给下流,执行线程:single。
  7. flatMap{...}执行其中的Lambda表达式,并把表达式结果传到下流:执行线程:single。

需要注意的是,这里面的RxJava代码同样会经历上面的几个阶段

创建数据流 -> 订阅数据流->数据流转

但是不会经历取消订阅阶段,因为这里还需要把数据传到下流,并且在这里最终设置的执行线程会影响外部的执行线程。运行上面代码最后线程被切换到computation了,但是代码中却没有这个逻辑,在查看Observable.timer(...)源码发现,它内部会把线程切换到computation。

public static Observable<Long> timer(long delay, @NonNull TimeUnit unit) {
    return timer(delay, unit, Schedulers.computation());
}

8.subscribe{...}执行其中的Lambda表达式,执行线程:computation。

四、总结

RxJava方法链调用经历了以下三个阶段,首先从上往下创建数据流,再从下往上订阅数据流,最后再从上往下流转数据,需要注意的是subscribeOn(...)是改变数据流订阅阶段,其以上流订阅时执行的线程,而observeOn(...)是改变数据流转阶段,其以下流的执行线程。

RxJavaAndroid