RxJava 源码解读

开源库地址:https://github.com/ReactiveX/RxJava
解读版本:1.1.8

基本概念

RxJava 是一个基于Java的响应式扩展实现: 即一个通过使用可观察序列来编写异步和基于事件的程序库。
它扩展了观察者模式以支持数据/事件序列,您可以根据声明好的规则通过操作符将序列组合在一起,而不用去担心低级别的线程,同步,线程安全和并发数据结构的各种问题。

基本用法

前面既然说了RxJava扩展了观察者模式,也就是说,RxJava是采用观察者模式实现的。既然是观察者模式,那么一定需要两个东西,被观察者和观察者。

怎么初始化一个观察者?(以下例子以订阅String类型为例子)
我们可以直接使用Observer来初始化,Observer是一个接口,里面有onNext,onCompleted,onError三个抽象方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observer<String> observer=new Observer<String>() {
@Override
public void onCompleted() {
//正常终止时调用,onError和onCompleted只会通知一个
}

@Override
public void onError(Throwable e) {
//当Observable遇到错误或者无法返回期望的数据时会调用这个方法,后续不会再调用onNext和onCompleted
}

@Override
public void onNext(String s) {
//Observable调用这个方法发射数据
}
};

此外我们可以使用Observer的子类Subscriber来初始化。Subscriber相对于Observer增加了onStartunsubscribe,事实上,即使你使用的是Observer,在内部仍然会被包装为Subscriber

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Subscriber<String> subscriber=new Subscriber<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {

}
};

怎么初始化一个被观察者?
初始化被观察者使用Observable,在call内进行处理事件。可以看出call的参数为Subscriber,这也进一步证实了Observer会被包装为Subscriber。只要被观察者调用call方法,订阅者就可以接受到事件/数据了。

1
2
3
4
5
6
7
8
Observable<String> observable=Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello");
subscriber.onNext("welcome to china");
subscriber.onCompleted();
}
});

那么问题来了,怎么建立一个订阅关系?
只需被观察者调用Observable.subscribe(Subscriber)即可。
于是整个流程是这样的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
 //被观察者
Observable<String> observable=Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello");
subscriber.onNext("welcome to china");
subscriber.onCompleted();
}
});

//观察者
Observer<String> observer=new Observer<String>() {
@Override
public void onCompleted() {
Log.d("JG","onCompleted");
}

@Override
public void onError(Throwable e) {
Log.d("JG","onError");
}

@Override
public void onNext(String s) {
Log.d("JG",s);
}
};

//被观察者订阅观察者(实际应理解为观察者订阅被观察者。)
observable.subscribe(observer);

运行结果如下。
image_1alc28ectmnu1nev16p11ibb1vs59.png-12.6kB

此外,该库还有非常完善的异常捕获机制,当在处理数据时发生异常,可以自动捕获并回调到onError中。将observable修改成如下后,进行测试:

1
2
3
4
5
6
7
Observable<String> observable=Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//...
throw new RuntimeException();//模拟抛出异常
}
});

测试结果如下:
image_1app9gp8iic0pbg1f7ilad1dhh9.png-4.1kB

OK,基本用法已经介绍完了,接下来,本篇将深入源码内部一探究竟(除了操作符,下篇会对操作符进行完全解析,如果只对操作符感兴趣敬请期待下篇)。如果你只是刚接触RxJava,请看给 Android 开发者的 RxJava 详解这篇文章。

源码解读

观察者(Subscriber)

Subscriber提供了一种从被观察者接收推送数据和通知以及从被观察者中取消订阅的机制。
实现了Observer和Subscription接口,在Observer的基础上加入了onStart生命周期方法。该类属性如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public abstract class Subscriber<T> implements Observer<T>, Subscription {
private static final long NOT_SET = Long.MIN_VALUE;//未设置请求数量
private final SubscriptionList subscriptions;//订阅列表,一个存放订阅者(实现了Subscription接口)的列表,以便一同解除订阅
private final Subscriber<?> subscriber;//订阅者

private Producer producer;//生产者(用来处理反压),被观察和观察者之间的请求通道

private long requested = NOT_SET; //请求数, 默认为Long.MIN_VALUE。

protected Subscriber() {
this(null, false);
}

protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
this.subscriber = subscriber;
//是否共享订阅列表
this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
}
//..
//省略了部分源码
}

从源码可以看出Subscriber内部包含了一个子Subscriber,可以共享同一个订阅列表。Producer用于建立被观察者和观察者间的可以指定请求数量的请求通道,一般用来配合解决反压问题(backpressure)。Producer是一个函数式接口,里面就一个抽象方法void request(long n);SubscriptionList是一个订阅列表,以便将多个订阅者一起取消订阅。
关于设置Producer和请求数的源码如下:
request(long n)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected final void request(long n) {
if (n < 0) {//必须大于等于0
throw new IllegalArgumentException("number requested cannot be negative: " + n);
}

Producer producerToRequestFrom = null;
synchronized (this) {
if (producer != null) {
producerToRequestFrom = producer;
} else {
//没有producer的话就保存到Subscriber的requested值,会逐渐累加。
addToRequested(n);
return;
}
}
//如果有producer就直接调用Producer的request。
producerToRequestFrom.request(n);
}

setProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
 
public void setProducer(Producer p) {
long toRequest;//请求数
boolean passToSubscriber = false;//是否传递给子Subscriber
synchronized (this) {
toRequest = requested;//赋值之前保存的请求数
producer = p;//赋值给producer
if (subscriber != null) {//内部Subscriber不为空
if (toRequest == NOT_SET) {//且当前的Subscriber未设置请求数
passToSubscriber = true;//标识传递给内部Subscriber
}
}
}

if (passToSubscriber) {//当前Subscriber未设置请求数且内部Subscriber不为空
subscriber.setProducer(producer);//就把Producer也赋值给内部的Subscriber
} else {


if (toRequest == NOT_SET) {
//内部Subscriber为空且请求数未设置,就默认最大。
producer.request(Long.MAX_VALUE);
} else {
//当前请求数已经设置,就给Producer。
producer.request(toRequest);
}
}
}

上述源码中会涉及到一个Producer到底给谁的问题。首先,如果在设置请求数时还没有初始化Producer,就进行累加保存。直到Producer被设置时,如果当前Subscriber未设置请求数且内部Subscriber不为空就把Producer赋值给内部的Subscriber,否则就会赋值给当前的Subscriber。要是当前Subscriber至今未设置请求数,就请求Long.MAX_VALUE数量的数据,多余部分就会忽略。

被观察者(Observable)

说到Observable,有必要来认识一下其他几个接口。
先来看下图:
image_1alj8i1ub14ijermlrpnkflm9.png-16.9kB

Funtion

Funtion是个空接口,里面没有任何实现。FuncN相关接口则常用于类型转换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//Func0 0个参数,返回R类型
public interface Func0<R> extends Function, Callable<R> {
@Override
R call();
}
//Func1 1个T类型参数,返回R类型
public interface Func1<T, R> extends Function {
R call(T t);
}
//Func2 两种类型参数,返回R类型
public interface Func2<T1, T2, R> extends Function {
R call(T1 t1, T2 t2);
}

//...

//FuncN N种类型参数,返回R类型
public interface FuncN<R> extends Function {
R call(Object... args);
}

Action

Action接口继承于Function的空接口,ActionN相关接口源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//Action0 传入0个参数
public interface Action0 extends Action {
void call();
}
//Action1 传入1个参数
public interface Action1<T> extends Action {
void call(T t);
}
//Action2 传入2个参数
public interface Action2<T1, T2> extends Action {
void call(T1 t1, T2 t2);
}

//...

//ActionN 传入N个参数
public interface ActionN extends Action {
void call(Object... args);
}

ActionN和FuncN的区别在于ActionN没有返回值而FuncN有返回值。因此,FuncN常用来进行类型转换,ActionN用于回调数据。

OnSubscribe

OnSubscribe继承于Action1,用于提供Subscriber对象。
OnSubscribe源码如下。

1
2
3
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {

}

Operator

操作符函数,继承于Func1,用于变换Subscriber,R表示下游的值,T表示上游的值。

1
2
3
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {

}

Transformer

与Operator一样,同样继承于Func1,只不过该函数用于变换Observable。T表示原先的值,R表示变换后的值。

1
2
3
public interface Transformer<T, R> extends Func1<Observable<T>, Observable<R>> {

}

整个Observable中的链式操作以及线程切换大都依赖以上接口。
还记得开头时介绍Observable.create创建被观察者的那句代码吗?不难发现,只要我们触发call方法,Subscriber就可以开始工作了。
image_1alj9e198f3bdm1jab1m16mjvm.png-21.9kB
此外,还可以通过from,just等等来初始化Observable,这就就不一一赘述了。

订阅(subscribe)

在了解了观察者和被观察者后,我们就来了解下如何进行订阅。订阅相关的方法有很多。但是每一种都会最后都会被封装为Subscriber.
image_1aqol5tvc1vp1r9vv6p1bk4ha29.png-19.3kB

Observer通过ObserverSubscriber使用代理的方式包装Observer。
Action通过ActionSubscriber来进行包装。
最终都会调用一下方法:

1
2
3
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}

Observable.subscribe中才是订阅的核心,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
//..
//省略了部分源码

subscriber.onStart();//这里调用onStart

//包装成SafeSubscriber(保障onError/onComplete只会执行一次)
if (!(subscriber instanceof SafeSubscriber)) {

subscriber = new SafeSubscriber<T>(subscriber);
}


try {
//RxJavaHooks 执行钩相关,用于拦截并修改对象
//使用钩子拦截并修改onSubscribe(如果没有注册钩子的话,默认不会做修改)
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
//...
//省略了错误处理相关的源码
subscriber.onError(RxJavaHooks.onObservableError(e));//处理异常相关
}
return Subscriptions.unsubscribed(); //返回一个
}
}

当调用subscribe时,首先Subscriber执行onStart。并且将Subscriber包装为SafeSubscriber,用于保证onError和onComplete只执行一次,即使你写了多个onError/onComplete也不会回调多次。此外,只要执行了onError/onComplete还会进行解除订阅操作,也就是说将不能再发送数据。RxJavaHooks则是钩子相关,用于拦截修改对象,如果有需求的话可以进行注册钩子,通过RxJavaPlugins#registerXXXExecutionHook来注册相关执行钩,源码在plugins包下。
image_1aqomk4sa1rj311pm19r61au8171am.png-22.6kB
如果没有注册钩子,默认会直接返回,不会修改任何对象。

RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);这句源码如果没有注册钩子的话,会直接返回OnSubscribe,然后调用call方法,Observable便开始发送数据项了。RxJavaHooks.onObservableReturn用于返回Subscription进行解除订阅。如果在订阅的过程中发生了异常,则会回调到onError中。不过需要注意的是,并不是所有的异常都会回调onError,而是会直接抛出异常。

以下列表中的异常如果发生是一定会被抛出的。
image_1aqon9fnt10q9k3qpa9peh15sn13.png-56.6kB

线程切换相关(Scheduler)

RxJava中一提及线程,那么无疑就会联想到Scheduler这个叫做调度器的东西。
Scheduler的种类有如下几种:

调度器类型 效果
Schedulers.computation() 用于计算任务,如事件循环等,不要用于IO操作。默认线程数等于处理器的数量
Schedulers.from(executor) 使用指定的Executor作为调度器
Schedulers.immediate() 在当前线程立即开始执行任务
Schedulers.io() 用于IO密集型任务,如异步阻塞IO操作,线程池会根据需要增长。
Schedulers.newThread() 为每个任务创建一个新线程
Schedulers.trampoline() 在当前线程排队执行(如果队列中已经存在其他任务)
AndroidSchedulers.from(Looper) 切换到Looper所在的线程执行(Android)
AndroidSchedulers.mainThread() 切换到主线程线程执行(Android)

Scheduler的作用不言而喻,其是一个抽象类,核心源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public abstract class Scheduler {

public abstract Worker createWorker(); //创建Worker


//Worker抽象类
public abstract static class Worker implements Subscription {

//执行一个Action,返回Subscription
public abstract Subscription schedule(Action0 action);

//延时执行一个Action,返回Subscription
public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit);


//延时周期性执行一个Action,返回Subscription,使用递归来实现周期性执行
public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) {
final long periodInNanos = unit.toNanos(period);
final long firstNowNanos = TimeUnit.MILLISECONDS.toNanos(now());
final long firstStartInNanos = firstNowNanos + unit.toNanos(initialDelay);
// MultipleAssignmentSubscription用于检查订阅状态,常用于循环内。
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();

//递归Action
final Action0 recursiveAction = new Action0() {
long count;
long lastNowNanos = firstNowNanos;
long startInNanos = firstStartInNanos;
@Override
public void call() {
action.call();

if (!mas.isUnsubscribed()) {

//..
//省略了计算下一次执行时间的源码

mas.set(schedule(this, delay, TimeUnit.NANOSECONDS));
}
}
};
MultipleAssignmentSubscription s = new MultipleAssignmentSubscription();
s.set(schedule(recursiveAction, initialDelay, unit));

mas.set(s);

return mas;
}

now() {
return System.currentTimeMillis();
}
}

看到这里,或许你还是很好奇Worker中的线程调度吧?那我们就来看一个最简单的源码。即Schedulers.newThread( )的NewThreadScheduler相关源码,如下:

1
2
3
4
5
6
7
8
9
10
11
12
public final class NewThreadScheduler extends Scheduler {
private final ThreadFactory threadFactory;

public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}

@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
}

整体源码还是非常简洁的,NewThreadWorker继承于Worker,是核心所在。对应的抽象方法实现如下。其中executor为ScheduledExecutorService,因此可以用于执行延时任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public Subscription schedule(final Action0 action) {
return schedule(action, 0, null);
}

@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
if (isUnsubscribed) {
return Subscriptions.unsubscribed();
}
return scheduleActual(action, delayTime, unit);
}


public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = RxJavaHooks.onScheduledAction(action); //钩子

ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
//提交到线程执行
f = executor.submit(run);
} else {
//提交到线程延时执行
f = executor.schedule(run, delayTime, unit);
}
run.add(f);

return run;
}

阅读完以上源码,相信对于线程的切换有了深入的了解了吧,用过Worker#schedule相关方法就可以切换到指定线程中进行执行。其他Scheduler在internal包中。这里就不一一介绍了。
image_1aqqornshvri1q8i5b81j461lfj9.png-33.2kB

下面来看看Scheduler在Rxjava中的相关应用:

observeOn切换线程

observeOn方法用于指定下游Subscriber回调发生的线程,observeOn方法最终都会调用如下源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//scheduler 线程调度器
//delayError 是否延迟等到全部发送完数据后才发射错误
//bufferSize 指定缓存大小
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
//ScalarSynchronousObservable是一个用于发送单个常量值的Observable
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}

return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
//创建一个新的Observable并返回
return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

OperatorObserveOn用于转换Subscriber。lift用于根据Operator来转换Subscriber然后创建新的Observable。如果你传入的是ImmediateScheduler、TrampolineScheduler这种在当前线程执行的Scheduler,为了避免开销将会被直接忽略。其他线程则返回ObserveOnSubscriber。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
//包装为ObserveOnSubscriber,转换Subscriber
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}

ObserveOnSubscriber继承了Action0接口,支持反压控制,可以配置是否延迟等到全部发送完数据后才发射错误通知,可以配置队列大小。ObserveOnSubscriber内部的核心方法如下:可以看出,onError/onComplete是一个赋值操作,onNext只是将数据添加到队列中,然后全部都会通过schedule来调取发射数据或通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(on.next(t))) {//加入等待队列中,当生产速率大于消费速率时抛出异常,即队列满了。
onError(new MissingBackpressureException());
return;
}
schedule();//调度
}

@Override
public void onCompleted() {
if (isUnsubscribed() || finished) {
return;
}
finished = true;
schedule();
}

@Override
public void onError(final Throwable e) {
if (isUnsubscribed() || finished) {
RxJavaHooks.onError(e);
return;
}
error = e;
finished = true;
schedule();
}

protected void schedule() {
if (counter.getAndIncrement() == 0) {//利用counter来计数调用了多少次schedule()。
//recursiveScheduler为Worker
recursiveScheduler.schedule(this);
}
}

ObserveOnSubscriber继承了Action0,因此,recursiveScheduler.schedule(this);中一定会调用call方法。源码如下:无非就是从队列中取出数据进行发射,如果指定了延迟发送异常通知,那么将会等到队列中的数据全部发完才会发送错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Override
public void call() {
long missed = 1L;
long currentEmission = emitted; //当前的发射数

final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;
final NotificationLite<T> localOn = this.on;


for (;;) {
long requestAmount = requested.get();//获取当前的请求数

while (requestAmount != currentEmission) {
boolean done = finished;
Object v = q.poll();//从队列中取出数据,
boolean empty = v == null; //队列为空

if (checkTerminated(done, empty, localChild, q)) {//检查是否已经发送结束
return;
}

if (empty) {
break;
}

localChild.onNext(localOn.getValue(v)); //调用原始Subscriber发送数据

currentEmission++;//当前发射数+1
if (currentEmission == limit) {
requestAmount = BackpressureUtils.produced(requested, currentEmission);
request(currentEmission);
currentEmission = 0L;
}
}

if (requestAmount == currentEmission) {
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
return;
}
}

emitted = currentEmission;
missed = counter.addAndGet(-missed);
if (missed == 0L) {
break;
}
}
}

转换完Subscriber过后,接下来将会通过lift操作符来创建下游Observable。OnSubscribeLift的核心源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void call(Subscriber<? super R> o) {

//首先调用Operator转换新Subscriber
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);

//模拟订阅操作
//调用新Subscriber#onStart
st.onStart();

//parent为上游Observable#onSubscribe
parent.call(st); //调用onSubscribe的call方法,开始发送数据。
}

subscribeOn切换线程

subscribeOn方法用于指定subscribe()所发生的线程,相关源码如下:

1
2
3
4
5
6
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
}

从源码可以看出,通过Observable#create方法又创建了一个新Observable。而OperatorSubscribeOn实现了OnSubscribe接口。用于在Scheduler中订阅观察者,核心源码自然在call方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker(); //创建Worker
subscriber.add(inner); //加入订阅列表中,方便解除订阅
//调用Worker#schedule方法,切换到Worker线程中执行任务
inner.schedule(new Action0() {
@Override
public void call() {
//获取Worker线程
final Thread t = Thread.currentThread();
//创建新Subscriber
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}

@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}

@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}

//保证在Worker线程中使用Producer
@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};
//在Worker线程中 使用source(上游Observable)订阅(非安全)新的Subscriber。
source.unsafeSubscribe(s);
}
});
}

整体流程如下,首先通过Scheduler来创建Worker,通过Worker执行Action0,Worker是一个抽象类。我们可以自己实现在子线程中工作的Worker,比如NewThreadWorkerThreadWorker以及PoolWorker都是在子线程中工作的Woker。在Action0的内部new了个新的Subscriber,然后让上游的Observable订阅。由于我们的订阅是发生在Worler线程中的,因此这样子就可以达到线程切换的要求。

Single

Single是一种特殊的Observable。不同于Observable,它每次只能发送一个值。因此它所对应的订阅者为SingleSubscriber。SingleSubscriber与Subscriber不同的地方在于它只有两个抽象方法。

1
2
3
4
 //发送成功时回调
public abstract void onSuccess(T value);
//发送失败时回调
public abstract void onError(Throwable error);

此外,因为每次订阅只能发送一个值,也不会产生反压问题。因此SingleSubscriber中也没有定义Producer来处理反压。

将Single转为Observable也很简单。

1
2
3
4
5
6
7
8
public final Observable<T> toObservable() {
return asObservable(this);
}

private static <T> Observable<T> asObservable(Single<T> t) {
//传入single中已桥接好的onSubscribe即可。
return Observable.create(t.onSubscribe);
}

你可能会好奇,到底是怎么实现桥接的。在Single的构造方法中做了如下桥接,以便能够实现Observable和Single之间的转换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
protected Single(OnSubscribe<T> f) {
//f为Single.OnSubscribe<T>
final OnSubscribe<T> g = RxJavaHooks.onCreate(f);

//桥接OnSubscribe为Observable.OnSubscribe<T>
this.onSubscribe = new Observable.OnSubscribe<T>() {

@Override
public void call(final Subscriber<? super T> child) {
//通过SingleDelayedProducer来控制Subscriber只能发送一个值
final SingleDelayedProducer<T> producer = new SingleDelayedProducer<T>(child);
child.setProducer(producer);

SingleSubscriber<T> ss = new SingleSubscriber<T>() {

@Override
public void onSuccess(T value) {
producer.setValue(value);//如果有请求,内部会触发next()来发送数据,否则保存数据等待请求。
}

@Override
public void onError(Throwable error) {
child.onError(error);
}

};

child.add(ss);
g.call(ss);
}

};
}

通过SingleDelayedProducer,一旦接受到值,通过setValue,将会触发发射数据到Subscriber中。

Completable

同样是一个特殊的Observable。但Completable并不关心发送的数据,而只关心发送成功与否。

Subject

Subject比较特殊,它既是一个Observable,也是一个Observer。

SubjectSubscriptionManager

Subject订阅管理器,实现了OnSubscribe接口,用来保存最近一次的值或者发射完毕的值。内部用过State来管理所有的订阅者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
    //当订阅者订阅前回调
Action1<SubjectObserver<T>> onStart = Actions.empty();
//当订阅者订阅时回调(没有发射完毕)
Action1<SubjectObserver<T>> onAdded = Actions.empty();
//当订阅者订阅时,如果已经是停止状态时回调。
Action1<SubjectObserver<T>> onTerminated = Actions.empty();

@Override
public void call(final Subscriber<? super T> child) {
SubjectObserver<T> bo = new SubjectObserver<T>(child);
addUnsubscriber(child, bo);//添加到订阅列表方便解除订阅。
onStart.call(bo);
if (!child.isUnsubscribed()) {
if (add(bo) && child.isUnsubscribed()) {//add 添加到State
remove(bo);
}
}
}

//add 添加到State
boolean add(SubjectObserver<T> o) {
do {
State oldState = get();
if (oldState.terminated) {//如果已经停止发射,就触发onTerminated
onTerminated.call(o);
return false;
}
State newState = oldState.add(o);//添加到State保存
if (compareAndSet(oldState, newState)) {
onAdded.call(o);//回调add
return true;
}
} while (true);
}

AsyncSubject

只发射来自原始Observable的最后一个值。以下例子只会接收到three。如果没有调用onCompleted,那么将不会接收到任何值,因为没法判断是否已经全部发送完毕。

1
2
3
4
5
6
7
8
9
// observer 将会只接受到 three。
AsyncSubject<Object> subject = AsyncSubject.create();
subject.subscribe(observer);
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
subject.onCompleted();

}

核心实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public static <T> AsyncSubject<T> create() {
final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>();
//创建AsyncSubject时,先配置好onTerminated,当已经停止且又有新的订阅者订阅时触发
state.onTerminated = new Action1<SubjectObserver<T>>() {
@Override
public void call(SubjectObserver<T> o) {
Object v = state.getLatest();
NotificationLite<T> nl = state.nl;
if (v == null || nl.isCompleted(v)) {
o.onCompleted();
} else
if (nl.isError(v)) {
o.onError(nl.getError(v));
} else {
o.actual.setProducer(new SingleProducer<T>(o.actual, nl.getValue(v)));
}
}
};
return new AsyncSubject<T>(state, state);
}

//调用onNext用于赋值
@Override
public void onNext(T v) {
lastValue = nl.next(v);
}
//调用onCompleted时,调用state.terminate设置最近值并获取订阅列表。如果是
@Override
public void onCompleted() {
if (state.active) {
Object last = lastValue;
if (last == null) {
last = nl.completed();
}
for (SubjectObserver<T> bo : state.terminate(last)) {
if (last == nl.completed()) {
bo.onCompleted();
} else {
//请求终止值
bo.actual.setProducer(new SingleProducer<T>(bo.actual, nl.getValue(last)));
}
}
}
}

BehaviorSubject

当观察者订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据(在还没有收到任何数据前,我们可以指定发射一个默认值),然后继续发射其它任何来自原始Observable的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private static <T> BehaviorSubject<T> create(T defaultValue, boolean hasDefault) {
final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>();
if (hasDefault) {
state.setLatest(NotificationLite.instance().next(defaultValue));
}
//添加订阅者是触发
state.onAdded = new Action1<SubjectObserver<T>>() {

@Override
public void call(SubjectObserver<T> o) {
//发送最近的一个值
o.emitFirst(state.getLatest(), state.nl);
}

};
state.onTerminated = state.onAdded;
return new BehaviorSubject<T>(state, state);
}

@Override
public void onNext(T v) {
Object last = state.getLatest();
if (last == null || state.active) {
Object n = nl.next(v);
for (SubjectObserver<T> bo : state.next(n)) {
bo.emitNext(n, state.nl);//发送下一个值
}
}
}

@Override
public void onCompleted() {
Object last = state.getLatest();
if (last == null || state.active) {
Object n = nl.completed();
for (SubjectObserver<T> bo : state.terminate(n)) {
bo.emitNext(n, state.nl);
}
}
}

PublishSubject

PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。

1
2
3
4
5
6
7
8
9
PublishSubject<Object> subject = PublishSubject.create();
// observer1 将会收到所有的数据和通知
subject.subscribe(observer1);
subject.onNext("one");
subject.onNext("two");
// observer2 只能收到 "three" 和 onCompleted通知
subject.subscribe(observer2);
subject.onNext("three");
subject.onCompleted();

ReplaySubject

ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。。

1
2
3
4
5
6
7
8
ReplaySubject<Object> subject = ReplaySubject.create();
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
subject.onCompleted();
// observer1/observer2可以收到所有的数据和通知
subject.subscribe(observer1);
subject.subscribe(observer2);

我们可以指定一定大小或者时间,当缓存超过指定大小或者超过指定时间将会被丢弃。通过不同的create方法,可以创建不同类型的ReplaySubject。
image_1arcg8d13111k1df31ohe56d1lcp9.png-21.4kB

1
2
3
4
5
6
7
8
9
10
11
//以下例子中,observer1收到two,three,four,onCompleted
//observer2收到three,four,onCompleted
ReplaySubject<String> replaySubject=ReplaySubject.createWithTimeAndSize(100, TimeUnit.MILLISECONDS,2, Schedulers.immediate());
replaySubject.onNext("one");
Thread.sleep(100);
replaySubject.onNext("two");
replaySubject.subscribe(observer1);
replaySubject.onNext("three");
replaySubject.onNext("four");
replaySubject.onCompleted();
replaySubject.subscribe(observer2);

SerializedSubject

包装了Subject,内部通过SerializedObserver,使其成为线程安全的对象,以便我们可以在多个线程发送数据而不会导致顺序错乱。

最后

由于RxJava的内容比较多,许多细节和内容无法在文中一一赘述,比如RxJava大量运用的代理模式,线程并发处理相关等等。关于操作符的内容准备下篇单独做个系统的剖析。此外,由于本人水平问题,部分地方理解的不够透彻,如有错误之处,欢迎指出。


本期解读到此结束。下一期,RxJava操作符完全解析。