RxJava介绍及封装

RxJava介绍

RxJava是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。
使用时,在gradle中引入依赖

1
2
compile 'io.reactivex:rxjava:1.1.0'
compile 'io.reactivex:rxandroid:1.1.0'

RxJava特点

RxJava是一个异步库,那么为什么不用现成的AsyncTask/Handle/Thread这些?
1、简洁,RxJava是一条从上到下的链式调用,当需求逻辑变得更复杂时,RxJava依然可以保持简洁
2、轻量,无依赖库,jar包小于1M
3、支持多语言,兼容所有JVM语言,如Java、Clojure、Groovy、Scala等
4、多线程支持,封装了各种并发实现,如threads, pools, event loops, fibers, actors
5、基于有条件的异步执行
6、更好的避免回调地狱,Observables 能够组合而不是嵌套,从而避免开发者陷入回调地狱

RxJava使用

RxJava观察者模式4种角色

RxJava用到了设计模式中的观察者模式。
生产:
1、Observable(被观察者)
2、Subject
消费:
3、Observer(观察者)
Observer是一个接口,提供了3个方法:onNext(T t), onError(Throwable e), onCompleted()。
4、Subscriber(订阅者)
Subscriber是Observer的子类,class Subscriber implements Observer, Subscription。
Subscriber在Observer的基础上增加了
onStart():这个方法在观察者和被观察者建立订阅关系后,而被观察者向观察者发送消息前调用,主要用于做一些初始化工作,如数据的清零或重置。
unsubscribe():这个方法用于取消订阅,若isUnsubscribed()为true,则观察者不能收到被观察者的消息。

创建Observable(被观察者)的方式

1、create,当Observable与Observer/Subscriber建立订阅关系(.subscribe(new Observer(){}))的时候,call()会被调用
Observable.create(Observable.OnSubscribe)

1
2
3
4
5
6
Observable.create(new Observable.OnSubscribe<Object>(){
@Override
public void call(Subscriber<? super Object> subscriber) {
}
});

如下,观察者Subscriber 将会被调用一次 onNext() 和一次 onCompleted()

1
2
3
4
5
6
7
Observable.create(new Observable.OnSubscribe<ArrayList<Thumbnail>>() {
@Override
public void call(Subscriber<? super ArrayList<Thumbnail>> subscriber) {
subscriber.onNext(mDSCBrowserModel.getThumbnailList());
subscriber.onCompleted();
}
});

2、just,这个方法可以传1-N个类型相同的参数,并会按照传入的参数的顺序来发射它们。just()方法也可以接受列表或数组,就像from()方法,但是它不会迭代列表发射每个值,它将会发射整个列表。
Observable.just(T…)

1
Observable.just(getBatteryLevel());

3、from,这个方法可以传数组或Iterable
Observable.from(T[]), Observable.from(Iterable<? extends T>)

1
2
String[] hello = {"Hello", "World"};
Observable observable = Observable.from(hello);

4、Observable.empty(),Observable.never(),和Observable.throw()
当我们需要一个Observable毫无理由的不再发射数据正常结束时,我们可以使用empty()。我们可以使用never()创建一个不发射数据并且也永远不会结束的Observable。我们也可以使用throw()创建一个不发射数据并且以错误结束的Observable。

创建Observer(观察者)及Subscriber(订阅者)的方式

根据Observable协议的定义,onNext可能会被调用零次或者很多次,最后会有一次onCompleted或onError调用(不会同时),传递数据给onNext通常被称作发射,onCompleted和onError被称作通知。
实现Observer接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};

或实现Observer抽象类Subscriber

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};

Subscribe(订阅)的方式

创建了 Observable 和 Observer/Subscriber 之后,再用 subscribe() 方法将它们联结起来,整条链子就可以工作了(目前在同一线程中)

1
2
3
observable.subscribe(observer);
// 或
observable.subscribe(subscriber);

场景示例

如何使用RxJava替代AsyncTask/Handle/Thread,实现“后台处理,前台回调”的异步机制?
1、简单的一次性消费

1
2
3
4
5
6
new Thread(new Runnable() {
@Override
public void run() {
getBatteryLevel();
}
}).start();

RxJava实现

1
2
3
4
5
6
7
8
Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
getBatteryLevel();
}
}).subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe();// 订阅时执行call方法

2、后台处理,前台回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Handler mHandler = new Handler(Looper.getMainLooper());
int level = 0;
new Thread(new Runnable() {
@Override
public void run() {
// run in thread
level = getBatteryLevel();
mHandler.post(new Runnable() {
@Override
public void run() {
//update in UI
mTvLevel.setText(""+level);
}
});
}
}).start();

RxJava实现

1
2
3
4
5
6
7
8
9
Observable.just(getBatteryLevel())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
mTvLevel.setText(""+level);
}
});

3、带进度条的更新
AsyncTask的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private class DeleteTask extends AsyncTask<Void, String, Void> {
@Override
protected void onPreExecute() {
super.onPreExecute();
mListener.onStartDelete();
}
@Override
protected Void doInBackground(Void... params) {
//此处省略...
publishProgress("" + (int) progress);
return null;
}
@Override
protected void onProgressUpdate(String... progress) {
mListener.onDeleteUpdate(Integer.parseInt(progress[0]));
}
@Override
protected void onPostExecute(Void aVoid) {
super.onPostExecute(aVoid);
mListener.onDeleteComplete();
}
}

RxJava的实现,需要用到Subject,Subject有四个实现类,分别为AsyncSubject、BehaviorSubject、PublishSubject和ReplaySubject,每个实现类都有特定的功能,参见
因此处在AsyncTask中更改,回调暂时保留,其实这里回调可去掉

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private PublishSubject<Integer> mDeleteProgress = PublishSubject.create();
Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
delete(lists);
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe(new Action0() {
@Override
public void call() {
mDeleteListener.onStartDelete();
}
}).doOnCompleted(new Action0() {
@Override
public void call() {
mDeleteListener.onDeleteComplete();
}
}).subscribe();
mDeleteProgress.distinct().observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscribe<Integer>() {
@Override
public void onError(Throwable e) {
Logger.d(TAG, "error:" + e.getMessage());
}
@Override
public void onNext(T t) {
mDeleteListener.onDeleteUpdate(integer);
}
@Override
public void onCompleted() {
}
});
//在delete方法中需要更新进度条的地方调用
mDeleteProgress.onNext((int) progress);

RxJava的二次封装

其实上面场景中会产生些重复代码,因此对RxJava二次封装,参见

1、线程(Scheduler)封装
每次调用,可封装为一个线程调度类
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class RxSchedulersHelper {
/**
* 在无数量上限的线程池执行IO操作(读写文件、读写数据库、网络信息交互等)
* 在主线程回调
*/
public static <T> Observable.Transformer<T, T> io_main() {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> tObservable) {
return tObservable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
/**
* 在自定义数目的线程池执行
* 在主线程回调
*
* @param threadCount 自定义的线程数
*/
public static <T> Observable.Transformer<T, T> custom_main(int threadCount) {
final ExecutorService es = Executors.newFixedThreadPool(threadCount, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> tObservable) {
return tObservable
.subscribeOn(Schedulers.from(es))
.observeOn(AndroidSchedulers.mainThread());
}
};
}
/**
* 在数目固定为CPU核数的线程池执行computation操作(CPU计算用的线程,适合于CPU密集型计算,不能操作文件、数据库和网络)
* 在主线程回调
*/
public static <T> Observable.Transformer<T, T> computation_main() {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> tObservable) {
return tObservable
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
/**
* 启用新线程执行
* 在主线程回调
*/
public static <T> Observable.Transformer<T, T> new_thread_main() {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> tObservable) {
return tObservable
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
}

2、对服务器返回结果封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class RxResultHelper {
/**
* 可处理所有继承Response的服务器返回结果
*/
public static Observable.Transformer<Response, Response> handleResult() {
return new Observable.Transformer<Response, Response>() {
@Override
public Observable<Response> call(Observable<Response> responseObservable) {
return responseObservable.flatMap(new Func1<Response, Observable<Response>>() {
@Override
public Observable<Response> call(Response result) {
if (null == result || result.getCodeResponse() != NetErrorID.ERR_SUCCESS) {
return Observable.error(new ServerException(result.getCodeResponse()));
} else {
return createData(result);
}
}
});
}
};
}
private static Observable<Response> createData(final Response result) {
return Observable.create(new Observable.OnSubscribe<Response>() {
@Override
public void call(Subscriber<? super Response> subscriber) {
try {
subscriber.onNext(result);
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
});
}
}

3、对观察者封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public abstract class RxSubscribe<T> extends Subscriber<T> {
private final String TAG = RxSubscribe.this.getClass().getSimpleName();
protected abstract void _onNext(T t);
@Override
public void onError(Throwable e) {
Logger.d(TAG, "error:" + e.getMessage());
}
@Override
public void onNext(T t) {
_onNext(t);
}
@Override
public void onCompleted() {
}
}

封装后的代码可简化为,以zip(合并)举例,此代码可直接放在Activity或controller中,所见及所得,一条链下来思路比较清晰。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Observable.zip(
Observable.just(NetCommandMgrImpl.getInstance().getFreeSpace()),
Observable.just(NetCommandMgrImpl.getInstance().getBatteryLevel()),
Observable.just(NetCommandMgrImpl.getInstance().getTouchEnable()),
Observable.just(NetCommandMgrImpl.getInstance().getSkinBeautyEnable()),
Observable.just(NetCommandMgrImpl.getInstance().getEVCompensation()),
new Func5<ResponseGetFreeSpace, ResponseGetBatteryLevel, ResponseGetOneByte, ResponseGetOneByte, ResponseGetOneShort, CameraStatus>() {
@Override
public CameraStatus call(ResponseGetFreeSpace responseGetFreeSpace, ResponseGetBatteryLevel responseGetBatteryLevel, ResponseGetOneByte responseGetTouchEnable, ResponseGetOneByte responseGetSkinBeautyEnable, ResponseGetOneShort responseGetEvStatus) {
CameraStatus cameraStatus = new CameraStatus();
cameraStatus.setRemainCount(responseGetFreeSpace.getJpgRemainCount());
cameraStatus.setBatteryLevel(responseGetBatteryLevel.getBatteryLevel());
cameraStatus.setTouchShutter(responseGetTouchEnable.getValue() == 0 ? false : true);
cameraStatus.setSkinBeauty(responseGetSkinBeautyEnable.getValue() == 0 ? false : true);
cameraStatus.setEVStatus(getEvStatus(responseGetEvStatus.getValue()));
return cameraStatus;
}
}
).compose(RxSchedulersHelper.<CameraStatus>io_main()).subscribe(new RxSubscribe<CameraStatus>() {
@Override
protected void _onNext(CameraStatus cameraStatus) {
mCameraStatusCtlView.initCameraStatus(cameraStatus);
}
});

参考

RxJava官方教程
RxJava官方教程中文翻译