作者:blue

v 信号: Mojitok8275

版权声明:本文图文为博主原创,转载请注明出处。

文章似乎有些标题党的嫌疑,但是我相信根据我的理解画出两幅图可以让大家理解 RxJava2 的核心原理,稍后不要吝啬,请叫我灵魂画手😄!相信 RxJava 是大家业务中用到比较多的一个依赖库,RxJava 的强大之处在于它改变了程序员的编程习惯,相比较其他的开源项目,Rxjava 是最弯弯绕的一个。对于 RxJava 种类繁多的操作符,大多数同学都表示很是头疼,也有不少同学陷入了学习操作符不能停的怪圈。操作符要不要学,当然要,但是如果能理解 RxJava 的核心,操作符的使用就像是学会九阳神功的张无忌学招数,必定是手到擒来。所谓器欲尽其用,必先得其法。

这篇文章我会讲些什么

  • RxJava2 基本的运行流程
  • RxJava2 线程切换的原理(涉及到为什么 subscribeOn() 只有第一次调用时有效)
  • 为什么一订阅就回调了 onSubscribe
  • 为什么 subscribeOn() 对上面的代码生效,observerOn() 对下面代码生效

以下内容如果涉及到自己写的代码我会采用 Kotlin 进行示例展示,涉及到 RxJava2 会展示部分源码。

1、简单的链式调用(无线程切换)

先来看一段示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Observable.create(object : ObservableOnSubscribe<String> {
override fun subscribe(emitter: ObservableEmitter<String>) {
Log.d("solart", "subscribe > ${Thread.currentThread().name}")
emitter.onNext("test")
emitter.onComplete()
}
}).flatMap(object : Function<String, Observable<String>> {
override fun apply(t: String): Observable<String> {
return Observable.just(t)
}
}).map(object : Function<String, Int> {
override fun apply(t: String): Int {
return 0
}
}).subscribe(object : Observer<Int> {
override fun onSubscribe(d: Disposable) {
Log.d("solart", "onSubscribe > ${Thread.currentThread().name}")
}

override fun onNext(t: Int) {
Log.d("solart", "onNext > ${Thread.currentThread().name}")
}

override fun onComplete() {
Log.d("solart", "onComplete > ${Thread.currentThread().name}")
}

override fun onError(e: Throwable) {
Log.d("solart", "onError > ${Thread.currentThread().name}")
}
})

这段代码中我们简单用了 createflatMapmap等操作符,进行了流式的数据转换,最后我们通过 subscribe 订阅了数据流,其实通过查看源码我们不难发现, RxJava 本身是个逆向订阅的过程,话不多说先看图

rxjava2

点击查看大图

1.1 数据源的包裹

比照着这张图,我们来看一下,首先蓝色虚线部分是我们代码中实际调用的顺序,查看 Observable.create 我们不难发现,此处就是产生了一个 ObservableCreate 实例,

1
2
3
4
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

如我们图中所示, ObservableCreate 内部包含一个类型为 ObservableOnSubscribe<T>source 变量,根据我们代码中的调用,这个 source 就是我们 Kotlin 代码中的匿名对象 object : ObservableOnSubscribe<String>

1
2
3
4
5
6
7
8
9
10
11
12
13
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;

public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
...
}
...
}

我们顺着代码的调用顺序,继续看一下 flatMap 的方法中又做了什么:

1
2
3
4
5
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize)
{

...
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}

类似的产生了一个 ObservableFlatMap 实例,而其内部持有一个类型为 ObservableSource<T>source 变量,而该 source 则是上一步中的 ObservableCreate 实例,依次我们看 map 依然是类似的代码,这里不在赘述,所以到此我们得到了图中蓝色虚线部分的内容,这个过程可以看作是一个将数据源层层打包的过程。

1.2 逆向订阅数据源

我们知道以上的代码调用并没有出发数据的流转,只有当我们调用 subscribe 时(图中上半部分红色实线部分)才真正触发了 RxJava 的数据流,我们来看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);

ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

// 重点!!! 发生订阅的核心方法
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
...
throw npe;
}
}

根据我们上面的分析,执行 subscribeActual 的对象其实是 ObservableMap ,我们来看它的 subscribeActual 的实现

1
2
3
4
5
6
7
8
9
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
...

@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
...
}

注意,此时产生了一个 MapObserver 对象,MapObserver 中通过 actual 持有了我们自己的匿名对象 object : Observer<Int> ,同样的,ObservableMap 执行 subscribeActual 又调用了上层的 source.subscribe ,依次逆向调用,就得到了我们图中上半部分的红线内容,这个过程我们可以称之为数据源的逆向订阅,这个过程同样也是一个层层打包的过程,只不过它打包的对象换成了观察者 Observer。

1.3 触发数据源产生原始数据,数据流转

当订阅发生在最顶层时,也就是 ObservableCreate 中的 subscribeActual ,此时触发了数据源的产生,通过 emitter 发射数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final class ObservableCreate<T> extends Observable<T> {
...
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent); //此时触发了 onSubscribe 回调,这里先提一下

try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
...
}

而我们代码中此时产生了真正的数据

1
2
3
4
5
override fun subscribe(emitter: ObservableEmitter<String>) {
Log.d("solart", "subscribe > ${Thread.currentThread().name}")
emitter.onNext("test")
emitter.onComplete()
}

此时我们再来看 CreateEmitter 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {


final Observer<? super T> observer;

CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}

@Override
public void onNext(T t) {
...
if (!isDisposed()) {
observer.onNext(t); //向下层分发数据
}
}
...
}

根据我们上面的分析 CreateEmitter 中持有的 observer 即是 FlatMapObserver 的实例,而 FlatMapObserver 调用 onNext 时,又会调用 MapObserver 的 onNext ,依次调用至我们自己实现的观察者的 onNext 处理数据,此时数据流转完毕。

观察我们这个图,你会发现,操作符 对应产生的被观察者和观察者命名规则很有规律,比如说被观察者的命名 Observable + 操作符 ,例如 ObservableMap = Observable + map,观察者命名大多遵循 操作符 + Observer ,例如 FlatMapObserver = flatMap + Observer。除了命名规则外,我们观察整个流程,你也会发现有两个包裹封装的过程,一个是按照代码顺序的操作符产生了一个一层层的数据源包裹(蓝色虚线的流程部分),另外一个是在逆向订阅时,将观察者按照订阅顺序打包成一个一层层的观察者包裹(上部分的红色流程部分)。

2、异步事件流编程(线程切换)

相信有了上面的分析,大家对 RxJava 的逆向订阅以及数据流转有了一定的认识,但是 RxJava 的强大之处在于它的异步事件流编程方式,随心所欲的切换工作线程,下面我们来分析它是如何做到的。

同样的我们还是先给出一个简单的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Observable.create(object : ObservableOnSubscribe<String> {
override fun subscribe(emitter: ObservableEmitter<String>) {
Log.d("solart", "subscribe > ${Thread.currentThread().name}")
emitter.onNext("test")
emitter.onComplete()
}
}).subscribeOn(Schedulers.io())
.map(object : Function<String, Int> {
override fun apply(t: String): Int {
return 0
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(object : Observer<Int> {

override fun onSubscribe(d: Disposable) {
Log.d("solart", "onSubscribe > ${Thread.currentThread().name}")
}

override fun onNext(t: Int) {
Log.d("solart", "onNext > ${Thread.currentThread().name}")
}

override fun onComplete() {
Log.d("solart", "onComplete > ${Thread.currentThread().name}")
}

override fun onError(e: Throwable) {
Log.d("solart", "onError > ${Thread.currentThread().name}")
}

})

这里简化了操作符的调用,以切换线程为示例,根据这段代码,我画出了这个过程的流程图(灵魂画手有没有?)如下:

rxjava2_with_scheduler

点击查看大图

图中不同颜色(红、绿、紫)的实线表示流程所属不同线程,体现在不同线程中的过程,且标上了对应的序号,方便大家观看,这个图已经能够揭示 RxJava 运转的核心原理。

2.1 逆向订阅时触发 subscribeOn 的线程切换

根据我们第一部分的分析,我们知道 RxJava 有两个包裹封装的过程,一个是按照代码顺序的操作符产生了一个一层层的数据源包裹,另外一个是在逆向订阅时,将观察者按照订阅顺序打包成一个一层层的观察者包裹,虽然我们在代码调用过程中使用了线程切换(subscribeOn 和 observerOn)这两个特殊的操作符,在整个流程中依然遵循了这两个包裹封装的过程,只不过它的特殊之处在于处理时完成了流程上的线程切换。

我们来看订阅时(图中⑦⑧的流程)切换线程的 ObservableSubscribeOn 的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}

@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

s.onSubscribe(parent);

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
...
}

在逆向订阅的流程中,通过指定 SchedulerSubscribeTask 任务交给线程池处理,我们先来看一下 SubscribeTask 的代码,就是执行了订阅:

1
2
3
4
5
6
7
8
9
10
11
12
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;

SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}

@Override
public void run() {
source.subscribe(parent); // 仅仅订阅了一下
}
}

我们再来看 scheduler.scheduleDirect() 中是如何做到线程切换的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public abstract class Scheduler {
...
@NonNull
public abstract Worker createWorker(); // 实现类中实现
...
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker(); // 创建一个 worker

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

DisposeTask task = new DisposeTask(decoratedRun, w);

w.schedule(task, delay, unit); //执行任务

return task;
}
...
}

我们示例中是切换到了 io 线程,所以我们对应的看一下 IoScheduler 的部分代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public final class IoScheduler extends Scheduler {
...
@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
...
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;

final AtomicBoolean once = new AtomicBoolean();

EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}

...

@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}

return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}

static final class ThreadWorker extends NewThreadWorker {
...
// 此处粘贴了了父类中的实现
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

...

Future<?> f;
try {
// 线程池执行任务
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
...
}

return sr;
}
}
}

综合上面的代码,我们来总结一下,其实 ObservableSubscribeOn 本身就是在 subscribeActual 中将上层数据源在异步线程中执行订阅,这样就完成了线程的切换,后续的流程都会在这个切换后的线程中执行,直到再次切换线程。因为 RxJava 本身是逆向订阅的流程,所以这里就解释了两个问题:1、为什么 subscribeOn() 对上面的代码生效?2、为什么 subscribeOn() 只有第一次调用时有效?归根结底都是因为逆向订阅的流程决定了 subscribeOn 是在订阅流程中起作用,此时数据还未产生,而在代码上第一个 subscribeOn 其实是逆向订阅流程的最后一个线程切换的地方,这个将会对生产原始数据所在线程产生直接影响 。这里还有一点要提一下,ObservableSubscribeOn 在执行 subscribeActual 时,回调了下层产生的 ObserveronSubscribe,如图中的④⑤⑥流程,所以这也是为什么,在观察者一订阅后就会在当前订阅的线程收到 onSubscribe 的回调的原因。

2.2 正向数据流触发 observerOn 的线程切换

同第一部分一样的,订阅到最上层时,触发数据源产生原始数据,从而又正向的流转数据,此过程我们不在详细分析,参照1.3,我们着重看一下 ObserveOnObserver 的 onNext 处理的逻辑,也就是图中步骤⑮⑯:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
...
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();

source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
...
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {


...
final Observer<? super T> actual;
final Scheduler.Worker worker;
...
@Override
public void onNext(T t) {
if (done) {
return;
}

if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule(); //类似 ObservableSubscribeOn.subscribeActual() 异步线程执行
}
...
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
...
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
...
}
}

示例中我们此时切换到了 Main 线程中执行,我们来看对应的 HandlerScheduler 实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
final class HandlerScheduler extends Scheduler {
private final Handler handler;
private final boolean async;
...
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");

run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
handler.postDelayed(scheduled, unit.toMillis(delay));
return scheduled;
}

@Override
public Worker createWorker() {
return new HandlerWorker(handler, async);
}

private static final class HandlerWorker extends Worker {
...
@Override
@SuppressLint("NewApi") // Async will only be true when the API is available to call.
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
...

run = RxJavaPlugins.onSchedule(run);

ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.

if (async) {
message.setAsynchronous(true);
}

handler.sendMessageDelayed(message, unit.toMillis(delay));

...

return scheduled;
}
}

从代码中我们可以看到,此时将 Runnable 通过 Handler 发到了住线程去执行,所以经过此步骤后,后续的 onNext 的处理已经切换为主线程。同样的,这里也解释了一开始我们提到的另一个问题:为什么 observerOn() 对下面代码生效?正是因为,数据的流向决定了 observerOn() 对后续的 onNext 产生影响。

总结

至此 RxJava 运转机制我们已经分析完毕,大家可以比照图中流程,跟踪代码调用关系,相信会有很大收获。 RxJava 本身是一个变种的观察者模式,正是因为框架本身要实现异步事件流编程,所以产生了逆向订阅的过程,同时数据又是正向流转的,这个过程中大家还需要理解两个包裹封装(被观察者、观察者)的过程,不管操作符怎么变换,都不会脱离这样的运作核心。

另外根据不同的操作符的实现,我们依照同样的模式,可实现自己的自定义操作符,只要能在订阅时和数据回流时做好上下层的衔接就好,这个大家可以自己实践。