深入理解RxJava编程思想( 五 )


实现观察者(这里指用户)
//观察者 实现public class UserPerson implements Observer {private static final String TAG = MainActivity.class.getSimpleName();private String name;private String message;public UserPerson(String name) {this.name = name;}@Overridepublic void update(Object o) {this.message = (String) o;readMessage();}public void readMessage() {Log.d(TAG, name + " 读取到消息: " + message);}}
写测试程序
public static void test() {//编辑部,编辑好的文案内容String msg = "学习Android必须学习Kotlin,哈哈";//创建一个微信公众号(被观察者)Observable server = new WechatServerObservable();//创建两个用户(观察者)Observer zhangsan = new UserPerson("张三");Observer lisi = new UserPerson("李四");//关注微信公众号server.addObserver(zhangsan);server.addObserver(lisi);//lisi取消了关注server.removeObserver(lisi);//微信公众号推送消息server.pushMessage(msg);}
上面就是一个标准的观察者设计模式
Hook点
首先了解什么是Hook?
Hook 技术又叫做钩子函数,在系统没有调用该函数之前,钩子程序就先捕获该消息,钩子函数先得到控制权,这时钩子函数既可以加工处理(改变)该函数的执行行为,还可以强制结束消息的传递 。简单来说,就是把系统的程序拉出来变成我们自己执行代码片段 。
找到的Hook点
/*** todo RxJava Hook点*/@SuppressLint("CheckResult")public static void testHook() {Observable.create(new ObservableOnSubscribe() {@Overridepublic void subscribe(ObservableEmitter emitter) throws Exception {emitter.onNext("A");}}).map(new Function() {@Overridepublic String apply(Object o) throws Exception {return "null";}}).subscribe(new Consumer>() {@Overridepublic void accept(String s) throws Exception {}});}
全局监听 hook
//Hook之前的监听RxJavaPlugins.setOnObservableAssembly(new Function() {@Overridepublic Observable apply(Observable observable) throws Exception {Log.d(TAG, "apply: 整个项目全局监听 到底多少个地方使用RxJava:" + observable);return observable;//todo 不破坏人家的功能}});
结论:很多操作符,都会经过【】监听
的观察者模式 创建创建使用()订阅
首先我们分析的源码:
public interface Observer {void onSubscribe(@NonNull Disposable d);//一订阅就会执行void onNext(@NonNull T t);//拿到上一个卡片流下来的数据void onError(@NonNull Throwable e);//拿到上一个卡片流下来的错误数据void onComplete();//事件结束}
的源码很简单,就是一个泛型接口,有4个函数,真正实现这个接口就在这里:
new Observer>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onNext(String s) {}@Overridepublic void onError(@NonNull Throwable e) {}@Overridepublic void onComplete() {}}
接下来看创建过程,源码分析:
深入理解RxJava编程思想

文章插图
public static Observable create(ObservableOnSubscribe source) {ObjectHelper.requireNonNull(source, "source is null");return RxJavaPlugins.onAssembly(new ObservableCreate(source));}
点进creat里面,看到creat传进去一个soure,这个我们称之为自定义,然后再将自定义丢进中,再进入看:
public final class ObservableCreate extends Observable {final ObservableOnSubscribe source;public ObservableCreate(ObservableOnSubscribe source) {this.source = source;}