给 Android 开发者的 RxJava 详解(http://gank.io/post/560e15be2dca930e00da1083)
1.1 rxjava源码分析:
但是知其然还要知其所以然;所以从这一章开始我们聊聊源码,分析RxJava的实现原理。本文我们主要从三个方面来分析RxJava的实现:
本章节基于RxJava1.1.9版本的源码
在RxJava系列2(基本概念及使用介绍)中我们介绍过,一个最基本的RxJava调用是这样的:
示例A
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello RxJava!");
subscriber.onCompleted();
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("completed!");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
首先调用Observable.create()创建一个被观察者Observable,同时创建一个OnSubscribe作为create()方法的入参;接着创建一个观察者Subscriber,然后通过subseribe()实现二者的订阅关系。这里涉及到三个关键对象和一个核心的方法:
首先我们来看看Observable.create()的实现:
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
这里创建了一个被观察者Observable,同时将RxJavaHooks.onCreate(f)作为构造函数的参数,源码如下:
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
我们看到源码中直接将参数RxJavaHooks.onCreate(f)赋值给了当前我们构造的被观察者Observable的成员变量onSubscribe。那么RxJavaHooks.onCreate(f)返回的又是什么呢?我们接着往下看:
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
Func1<OnSubscribe, OnSubscribe> f = onObservableCreate;
if (f != null) {
return f.call(onSubscribe);
}
return onSubscribe;
}
由于我们并没调用RxJavaHooks.initCreate(),所以上面代码中的onObservableCreate为null;因此RxJavaHooks.onCreate(f)最终返回的就是f,也就是我们在Observable.create()的时候new出来的OnSubscribe。(由于对RxJavaHooks的理解并不影响我们对RxJava执行流程的分析,因此在这里我们不做进一步的探讨。为了方便理解我们只需要知道RxJavaHooks一系列方法的返回值就是入参本身就OK了,例如这里的RxJavaHooks.onCreate(f)返回的就是f)。
至此我们做下逻辑梳理:Observable.create()方法构造了一个被观察者Observable对象,同时将new出来的OnSubscribe赋值给了该Observable的成员变量onSubscribe。
接着我们看下观察者Subscriber的源码,为了增加可读性,我去掉了源码中的注释和部分代码。
public abstract class Subscriber<T> implements Observer<T>, Subscription {
private final SubscriptionList subscriptions;//订阅事件集,所有发送给当前Subscriber的事件都会保存在这里
...
protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
this.subscriber = subscriber;
this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
}
...
@Override
public final void unsubscribe() {
subscriptions.unsubscribe();
}
@Override
public final boolean isUnsubscribed() {
return subscriptions.isUnsubscribed();
}
public void onStart() {
}
...
}
public interface Subscription {
void unsubscribe();
boolean isUnsubscribed();
}
Subscriber实现了Subscription接口,从而对外提供isUnsubscribed()和unsubscribe()方法。前者用于判断是否已经取消订阅;后者用于将订阅事件列表(也就是当前观察者的成员变量subscriptions)中的所有Subscription取消订阅,并且不再接受观察者Observable发送的后续事件。
前面我们分析了观察者和被观察者相关的源码,那么接下来便是整个订阅流程中最最关键的环节了。
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
subscriber.onStart();
if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}
try {
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
...
return Subscriptions.unsubscribed();
}
}
subscribe()方法中将传进来的subscriber包装成了SafeSubscriber,SafeSubscriber其实是subscriber的一个代理,对subscriber的一系列方法做了更加严格的安全校验。保证了onCompleted()和onError()只会有一个被执行且只执行一次,一旦它们其中方法被执行过后onNext()就不在执行了。
上述代码中最关键的就是RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)。这里的RxJavaHooks和之前提到的一样,RxJavaHooks.onObservableStart(observable, observable.onSubscribe)返回的正是他的第二个入参observable.onSubscribe,也就是当前observable的成员变量onSubscribe。而这个成员变量我们前面提到过,它是我们在Observable.create()的时候new出来的。所以这段代码可以简化为onSubscribe.call(subscriber)。这也印证了我在RxJava系列2(基本概念及使用介绍)中说的,onSubscribe.call(subscriber)中的subscriber正是我们在subscribe()方法中new出来的观察者。
到这里,我们对RxJava的执行流程做个总结:首先我们调用crate()创建一个观察者,同时创建一个OnSubscribe作为该方法的入参;接着调用subscribe()来订阅我们自己创建的观察者Subscriber。 一旦调用subscribe()方法后就会触发执行OnSubscribe.call()。然后我们就可以在call方法调用观察者subscriber的onNext(),onCompleted(),onError()。
最后我用张图来总结下之前的分析结果:
之前我们介绍过几十个操作符,要一一分析它们的源码显然不太现实。在这里我抛砖引玉,选取一个相对简单且常用的map操作符来分析。
我们先来看一个map操作符的简单应用:
示例B
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onCompleted();
}
}).map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "This is " + integer;
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted!");
}
@Override
public void onError(Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
为了便于表述,我将上面的代码做了如下拆解:
Observable<Integer> observableA = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onCompleted();
}
});
Subscriber<String> subscriberOne = new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted!");
}
@Override
public void onError(Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onNext(String s) {
System.out.println(s);
}
};
Observable<String> observableB =
observableA.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "This is " + integer;;
}
});
observableB.subscribe(subscriberOne);
map()的源码和上一小节介绍的create()一样位于Observable这个类中。
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return create(new OnSubscribeMap<T, R>(this, func));
}
通过查看源码我们发现调用map()的时候实际上是创建了一个新的被观察者Observable,我们姑且称它为ObservableB;一开始通过Observable.create()创建的Observable我们称之为ObservableA。在创建ObservableB的时候同时创建了一个OnSubscribeMap,而ObservableA和变换函数Func1则作为构造OnSubscribeMap的参数。
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
final Observable<T> source;//ObservableA
final Func1<? super T, ? extends R> transformer;//map操作符中的转换函数Func1。T为转换前的数据类型,在上面的例子中为Integer;R为转换后的数据类型,在该例中为String。
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
@Override
public void call(final Subscriber<? super R> o) {//结合第一小节的分析结果,我们知道这里的入参o其实就是我们自己new的观察者subscriberOne。
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
source.unsafeSubscribe(parent);
}
static final class MapSubscriber<T, R> extends Subscriber<T> {
final Subscriber<? super R> actual;//这里的actual就是我们在调用subscribe()时创建的观察者mSubscriber
final Func1<? super T, ? extends R> mapper;//变换函数
boolean done;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}