-文章来源:itsCoder 的 WeeklyBolg 项目
- itsCoder主页:http://itscoder.com/
- 作者:yongyu0102
- 审阅者:hymane
说在前面,本文较长,从观察者模式到 RxJava 原理,以及 RxJava 的一些常用操作符的使用,并且对主要涉及到的源码进行了详细分析,讲解内部实现原理,都是笔者自己的学习笔记,所以你可以选择感兴趣的地方去阅读,如果你有时间也可以慢慢品尝,希望对你有所帮助!
一、初识 RxJava
RxJava 是什么 :它就是一个实现异步操作的库,使你的程序逻辑简介清晰实现链式调用,避免代码的迷之嵌套以及各种接口回调。
扩展的观察者模式:RxJava 的异步实现,是通过一种扩展的观察者模式来实现的,观察者模式面向的需求是:A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,需要在 B 变化的一瞬间做出反应。Android 开发中一个比较典型的例子是点击监听器 OnClickListener
。对设置OnClickListener
来说, View 是被观察者, OnClickListener
是观察者,二者通过 setOnClickListener()
方法达成订阅关系。订阅之后用户点击按钮的瞬间,Android Framework 就会将点击事件发送给已经注册的OnClickListener
。
RxJava 中重要概念
Observable:被观察者,这个类提供一系列方法用于被 Observers 去订阅,即在 RxJava 中 一个 Observer 观察者去 subscribe 订阅一个 Observable 被观察者,Observable 决定事件触发的时候将有怎样的行为,即事件的产生者。
Observer: 观察者身份,用于观察 Observable,接受被观察者发送的事件,下面这段原文说的很形象:
|
|
大概意思是:在一个观察者 Observer 调用 (calls )一个被观察者 Observable 的 subscribe 方法之后,这个被观察者就会调用(calls )观察者的 onNext()
方法来发送消息。
subscribe:动词订阅,执行订阅,用于 Observer 去订阅 Observable,使二者之间建立联系。
最后三者之间的关系:Observable 和 Observer 通过 subscribe()
方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
订阅之后结果:在 Observer 观察者 subscribe 订阅了被观察者 Observaber 之后会产生 onCompleted(表示事件完成)、onNext(接受事件产生的结果)、onError(表示事件产生错误)。
onCompleted(): 表示事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列,RxJava 规定,当不会再有新的 onNext()
发出时,需要触发 onCompleted()
方法作为结束标志。
onNext(): 接受发送的事件,即接受数据。
onError(): 事件队列异常。在事件处理过程中出异常时,onError()
会被触发,同时队列自动终止,不允许再有事件发出。在一个正确运行的事件序列中,onCompleted()
和 onError()
有且只有一个,并且是事件序列中的最后一个。需要注意的是 onCompleted()
和 onError()
二者是互斥的,即在队列中调用了其中一个,就不再调用另一个。
RxJava 的观察者模式大致如下图:
Subscriber:Subscriber 对 Observer 接口进行了一些扩展,
|
|
也是观察者(订阅者),他的基本使用方式与 Observer 是完全一样的,在订阅者即 Subscriber 调用了被观察者 Observabler 的方法 subscribe 之后,被观察者 Observable 将会调用 Subscriber’s 的方法 onNext 发送事件,而且在事件发送完毕会调用 Subscriber 的 onCompleted 方法或者在发送事件过程中出现错误就会调用 Subscriber 的 onError 方法。
|
|
而且这是一个抽象类,使用的时候必须实现其抽象方法,不可以直接 new ,可以使用匿名内部类的方式进行 new ,如下:
|
|
二、Observable 创建的几种方式及源码
2.1 Observable.create(new Observable.OnSubscribe()
|
|
看一下 OnSubscribe 这个类的源码:
|
|
看一下 Observable.create
源码
|
|
直接调用 了Observable 构造方法,只是将 OnSubscribe 参数进行了一层的包装,下面看一下如何包装的,这里的 hook 对象为 RxJavaObservableExecutionHook 类,是 RxJavaPlugins 中的一个类,用于插入一些你所需要的代码,记录,测试等,在默认的情况下,没有做任何对代码逻辑功能有影响的事情,以下是官方文档给出的解释:
This plugin allows you to register functions that RxJava will call upon certain regular RxJava activities, for instance for logging or metrics-collection purposes.
hook.onCreate(f)
源码如下:
|
|
大家看一下,这里直接返回了传入的参数,所以说这个类没做对业务逻辑有影响的事情,其他调用也类似,只是做了个包装,所以我们在分析源码思路的时候可以忽略其作用。那么接着看 Observable 构造函数干了什么:
|
|
很简单,直接保存全局持有创建的 onSubscribe 对象。这里被观察者创建源码就这么简单,分析完毕。下面看一下,我们实例化观察者 Observable 对象做了什么:
|
|
一看就这么简单,就是一个接口,里面是我们实例化时候需要重写的几个方法,大家都很熟悉。
下面看一下订阅 Observable.subscribe(observer)
方法干了什么?
|
|
将 Observer 包装成 Subscriber代码:
|
|
这里也很简单,没什么好说的,接着看 subscribe 方法:
|
|
该方法直接调用了两个参数的 subscribe 方法,而传递进去的参数一个是我们创建的观察者 subscriber (Observer) ,一个是被观察者自己本身 Observable 即参数 this ,接着看:
|
|
2.2 Observable.just(……)
用法如下:
|
|
源码:
|
|
源码可以看出, 在 just()
方法内部直接将传入的不固定个数的参数直接转换为一个数组,然后传递给 from()
方法,那么我们看一下 from()
方法的用法:
2.3 Observable.from(T[] array)
|
|
用法很简单,看源码:
|
|
从源码可以看出,这个方法的作用就是将一个数组转变为一个能够发送数组元素的 Observable 对象。
根据传入的数组长度分为三种情况进行调用,我们一起分析下:
第一情况,数组长度为 0:
这种情况调用了 empty()
方法,即数据为空,这种情况最终会调用 EmptyObservableHolder 类的 call()
方法,而 EmptyObservableHolder 继承自 OnSubscribe ,重写了 call()
方法:
|
|
很明显,如果数组内元素个数为 0,那么直接调用了 Subscriber 的 onCompleted()
方法完成数据发送。
第二情况,数组长度为1,源码如下:
|
|
接着看 ScalarSynchronousObservable.create()
源码:
|
|
看 ScalarSynchronousObservable 含一个参数的构造方法源码:
|
|
这个方法很关键,ScalarSynchronousObservable 这个类继承自 Observable 类,所以
super(hook.onCreate(new JustOnSubscribe<T>(t)))
就是调用了 Observable 含有一个参数的构造方法,然后看一下传入的参数即 hook.onCreate(new JustOnSubscribe<T>(t))
这个方法 返回的对象,这里的 hook
就是我们前面说过的 RxJavaObservableExecutionHook 类,是RxJavaPlugins中的一个类,用于插入一些你所需要的代码,记录,测试等,最终直接返回了传入的参数,没做对业务逻辑有作用的事情,所以 super(hook.onCreate(new JustOnSubscribe<T>(t)));
方法我们就可以简化为 new Observer( new JustOnSubscribe<T>(t))
,即直接 new 一个 Observable 对象,传入一个 OnSubscribe 参数,这个结果和我们前面分析的直接创建 Observable 对象的方法Observable.create(OnSubscribe<T> f)
执行结果是一样的,即这个方法最终其实还是调用了我们前面直接使用的方法,豁然开朗。
|
|
那么再看一下这个 JustOnSubscribe 类:
|
|
这个类继承自 OnSubscribe 类,并重写了 call()
方法,这里先看一下s.setProducer(createProducer(s, value))
这个方法:
|
|
这个方法的作用就是根据 STRONG_MODE 参数和传入的 Subscriber 参数和 泛型参数 T 创建一个数量发生器(Producer,是一个接口,它只有一个 request()
方法,用来在 Observable 和 Subscriber 直接创建一个请求信道,允许 Subscriber 向 Observable 请求确定个数的事件,这个确定的数量将会影响调用 Observer.onNext(Object)
方法,这样可以限制请求,一般实现该接口的类,都会包含一个 Subscriber 对象和一个待处理的数据,createProducer(s, t)
方法中,s 是一个 Subscriber 对象,t 是一个待处理的参数,可以在Producer 中先对 t 进行相应的处理随后,再将数据传送给 Subscriber ,STRONG_MODE 为引用模式,默认为 false,那么就会执行 new WeakSingleProducer<T>(s, v)
,看一下这个方法:
|
|
这个类里面主要方法就是 request(long n)
方法,而该方法的作用就是只执行一遍 Subscriber 的 onNext()
和 onCompleted()
方法,来发送一次数据并结束订阅过程。
再看一下 Subscriber 的 setProducer(Producer p)
方法:
|
|
通过源码可以看出 setProducer(Producer p)
方法主要完成的任务有:给 Subscriber 对象的 Producer 赋值,调用 producer.request()
方法,这样就完成了数据的发送。而上面那些个 if 语句判断情况,其实方法注释已经写的很清楚,我这里简单翻译下:如果设定了其他的 subscriber (通过调用构造函数) ,那么这个方法将会执行 subscriber.setProducer(producer)
方法,注意这里是调用你设定那个其他 subscriber 的 setProducer(producer)
方法 ;如果没有设定其他的 subscriber 并且 现在这个 subscriber 没有设定限定请求个数(toRequest == NOT_SET) ,那么 producer.request(Long.MAX_VALUE)
方法将会调用;如果设定了其他 subscriber 并且限制了请求事件个数(toRequest != NOT_SET),那么 producer.request(toRequest)
方法将得到执行。
第三情况,数组长度大于 1:
调用代码为 create(new OnSubscribeFromArray<T>(array))
,直接调用 Observable 类的 create(OnSubscribe<T> f)
方法,这个构造方法前面我们分析过,所以直接看 new OnSubscribeFromArray<T>(array)
方法,OnSubscribeFromArray 这个类实现了 OnSubscribe 类,我们先看这个类的构造方法源码:
|
|
这个构造方法很简单,就是将传递进来的参数保存为成员变量,既然 OnSubscribeFromArray 这个类实现了 OnSubscribe 类,我们肯定要去看一下重写的 call(Subscriber<? super T> child)
方法:
|
|
这里 调用的 Subscriber 的 setProducer(Producer p)
方法前面我们分析过,所以直接看 new FromArrayProducer<T>(child, array)
方法,FromArrayProducer 这个类继承自 Producer 类,先看一下构造方法:
|
|
构造方法就是将传递进来的 Subscriber 对象和数组 array 保存为成员变量。再看一下重写的 request(long n)
方法:
|
|
这个方法内部调用分了三种情况:第一种当 请求数量 n 小于 0 的时候直接抛出一个异常;第二种当 请求数量 n == Long.MAX_VALUE 的时候,首先进行了 BackpressureUtils.getAndAddRequest(this, n) == 0
判断,这行代码的作用是采用 CAS 操作模式将数量 n 赋值给 request 如果操作成功则返回原始值,这个原始值是 0,即返回值为 0,代表操作成功了,其中 CAS 操作模式主要应用在 Java 并发编程,大家可以 Google 了解下,然后看一下调用的 fastPath()
方法代码:
|
|
这个方法也很简单,就是直接遍历数组 array 并调用 Subscriber 的 onNext(t)
发送数据,最后调用 onCompleted()
方法结束发送;第三种情况调用 slowPath(n)
方法源码如下:
|
|
这个方法主要作用其实还是遍历数组 array 并调用 Subscriber 的 onNext(t)
发送数据,最后调用 onCompleted()
方法结束发送,只不过是添加了请求限制个数限制条件的各种判断。
2.4 Observable.map(Func1<? super T, ? extends R> func)
map()
函数使用如下:
|
|
下面看一下源码:
|
|
这个方法主要作用就是得到一个新的 Observable 对象, 将原始的 Observable 发送的对象添加一个功能处理一下再将处理后的对象发送出去;看一下方法传递的参数 Func1 类源码:
|
|
在 RxJava 中除了有 Func1 还有 Func2 等,其实 FuncX 就是对有参数且有返回值的一类方法的包装而已,将T类型的数据转换为R类型数据。OperatorMap 类源码:
|
|
OperatorMap 类实现了 Operator 类,而 Operator 类实现了 Func1 类,OperatorMap 内部主要是重写了 call 方法,注意这里看着像是将 SubscriberonNext(T t)
方法,这些转换都是在这个重写的 onNext(T t)
方法中进行 ,所以当我们调用 OperatorMap. call(final Subscriber<? super R> o)
得到 返回的 MapSubscriberparent.onNext()
方法 就会完成将输入类型 T 转换为 R类型并发送出去的效果,一起看一下 MapSubscriber 这个类:
|
|
这个类内部也很简单,主要是重写了 Subscriber 的 onNext(T t)
方法, 就是通过构造方法传递进来的 Func1<? super T, ? extends R> 对象将输入类型 T 转换为 R类型,然后在通过传递进来的 Subscriber<? super R> 的 onNext(R)
将转换后得到的结果 R 发送出去。
最后看一下 lift(final Operator<? extends R, ? super T> operator)
方法源码:
|
|
这个方法的主要作用就是对当前的 Observable 对象进行一个功能变化,并返回一个新的 Observable 对象,当这个新的 Observable 对象被订阅之后,就可以通过这个 Operator 对象的功能变换来发送当前 Observable 对象的数据。换句话说。这个方法通过在一个特定的 Observable 内部使得观察者 Observers 和 被观察者 Observable 来接发数据形成关联。lift 方法内部直接调用了 Observable 的构造函数创建一个 Observable 并返回,而这里传入的参数 OnSubscribe 和 Operator 我们前面已经分析过,下面只要看 OnSubscribeLift 这个类:
|
|
这个类继承自 OnSubscribe 重写了 call 方法,在 call 方法内部通过调用 operator.call(o) 方法得到一个新的 Subscriber,最后将这个 Subscriber 传递给原始 OnSubscribe 的 call 方法,到这里就完成了整个转换操作,剩下的就是我们在 2.1 章节分析过的 Observable.subscribe 方法部分了,即在执行 map 函数转换之后得到 Observable
对象变化
这里面最后总结一下,其实 RxJava 这个 lift (包括其他 map 、flatMap) 操作符就是完成各种对象的变换,而变换主要涉及到的就是 Subscriber、Observable、OnSubscribe 这三个对象的各种转换。
Observable的变化
每个操作符都会新建一个 Observable 和一个新建的 OnSubscribe (下游Observable和下游 OnSubscribe );
下游 OnSubscribe 中持有上游的 OnSubscribe;
下游的 OnSubscribe 先调用 Operator 拿到针对上游的 Subscriber,然后就可以调用上游OnSubscribe.call() 方法了。
流程图(代码分解):当 Subscribe 订阅时代码执行流程:
2.5 Observable.flatMap(Func1<? super T, ? extends Observable<? extends R>> func)
|
|
2.6 Compose 操作符
在说 compose之前要先介绍下Transformer
Transformer 实际上就是一个Func1, Observable>
,换言之就是:可以通过它将一种类型的 Observable 转换成另一种类型的 Observable,比如通过 Transformer 将Observable<T>
转换成了Observable<R>
,compose()
和 lift()
的区别在于,lift() 是针对事件项和事件序列的,而 compose() 是针对 Observable 自身进行变换。这个功能其实 flatmap 函数也能实现,但是 compose 操作符实现了代码重用,只需要写一个 transfrmer 就可以利用 compose 操作符实现反复利用,不用每次都写 flatmap 写重复代码,源码:
|
|
以下代码可以将 Obsaverble
|
|
使用示例:
|
|
2.7 变换的原理:lift()
这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)
|
|
这个变换原理我们在分析 map 函数的时候已经分析过了,总结起来就是通过新建一个 Obsaverble 和 OnSubscribe 来发送数据,而发送的数据是通过 Operator<? extends R, ? super T> operator 来实现变换的。主要分为以下几步:
1.在执行lift()
后会创建一个新的 Observable 我们标记为 Observable2,加上之前的原始 Observable 我们标记为 Observable1,现在有两个 Observable ;
2.在创建新 Observable2 的时候会创建一个新的 OnSubscribe2 标记为 OnSubscribe2 , 加上之前的原始 Observable1 中的原始 OnSubscribe1,也就有了两个 OnSubscribe;
3.当用户调用 lift()
后 再去调用 subscribe()
的时候,其实是使用的 lift()
所返回的新的 Observable2 ,于是它所触发的 onSubscribe2.call(subscriber1)
,即在 lift()
中生成的那个 OnSubscribe2;
4.而这个新 OnSubscribe2 的 call()
方法中会持有原始的 onSubscribe1 ,就是指的原始 Observable1 中的原始 OnSubscribe1 ,在这个 call()
方法里,用 operator.call(subscriber1)
生成了一个新的 Subscriber2,Operator 就是在这里,通过自己的 call()
方法将新 Subscriber2 和原始 Subscriber1 进行关联,并插入自己的『变换』代码以实现变换,然后利用这个新 Subscriber2 向原始 Observable1 进行订阅。 这样就实现了 lift()
过程,有点像一种代理机制,通过事件拦截和处理实现事件序列的变换。RxJava 都不建议开发者自定义 Operator 来直接使用 lift(),而是建议尽量使用已有的 lift()
包装方法(如 map()
flatMap()
等)进行组合来实现需求,因为直接使用 lift()
非常容易发生一些难以发现的错误。
下面这是一个将事件中的 Integer
对象转换成 String
的例子:
|
|
以上是本次学习笔记内容,完结!
参考文章: