Maxiee 的 RxJava 学习指南 (2)

redmouse 发布于1年前 阅读300次
0 条评论

在上一篇中主要对官方文档进行了学习.

如果只看文档里的知识而不去实践, 自己只是知道而非理解, 必须要在实际项目中进行实践, 才能得到真知.

我采取的办法就是找一个实际的开源项目来学习, 最终我选择了 RxJava-Android-Samples 这个项目.

RxJava-Android-Samples

选择 RxJava-Android-Samples 这个项目的原因通过它的介绍可以看出:

This is a repository with real-world useful examples of using RxJava with Android.

这不正是我们需要的吗! 感谢 kaushikgopal 的无私奉献.

下面我们通过对这个项目的源码阅读, 来学习 RxJava 的实践知识.

整个代码结构十分清晰, 在 MainFragment 上点击对应的按钮进入对应 Demo 的 Fragment.

后续的文章中会针对代码的要点进行讲解, 因此建议将这个项目 Clone 下来结合文章一起看, 否则可能会觉得有些跳跃, 但是结合代码看就很容易明白了 .

后台任务与并发

本文中, 我们学习 RxJava-Android-Samples 的第一个 Demo -- 后台任务与并发, 它位于 ConcurrencyWithSchedulersDemoFragment .

这个页面的功能是点击一个按钮, 然后会在后台做一个延时操作 (以定时器模拟), 待定时操作完成后, 在界面中打一段 Log.

这部分代码大体流程如下图所示:

Maxiee 的 RxJava 学习指南 (2)

下面来具体分析.

创建 Observable

创建 Observable 的代码位于 _getObservable() 方法, 它的代码为:

private Observable<Boolean> _getObservable() {
    return Observable.just(true)
        .map(
            aBoolean -> {
                _log("Within Observable");
                _doSomeLongOperation_thatBlocksCurrentThread();
                return aBoolean;
            });
}

其中:

  • 源 Observable 发出一个 true 数据
  • 数据传入 map 运算符
  • map 运算符的 _doSomeLongOperation_thatBlocksCurrentThread() 方法里执行了一个阻塞线程的耗时操作
  • 当执行完毕后, 返回 aBoolean 也就是传递数据

_doSomeLongOperation_thatBlocksCurrentThread() 的代码为:

private void _doSomeLongOperation_thatBlocksCurrentThread() {
    _log("performing long operation");

    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        Timber.d("Operation was interrupted");
    }
}

功能就是一个阻塞线程 3s.

从中我们可以看出, 这个 map 操作符里面, 就是执行耗时操作的地方.

创建 Observer

创建 Observer 的代码位于 _getDisposableObserver() 方法中, 它的代码为:

private DisposableObserver<Boolean> _getDisposableObserver() {
    return new DisposableObserver<Boolean>() {

        @Override
        public void onComplete() {
            _log("On complete");
            _progress.setVisibility(View.INVISIBLE);
        }

        @Override
        public void onError(Throwable e) {
            Timber.e(e, "Error in RxJava Demo concurrency");
            _log(String.format("Boo! Error %s", e.getMessage()));
            _progress.setVisibility(View.INVISIBLE);
        }

        @Override
        public void onNext(Boolean bool) {
            _log(String.format("onNext with return value \"%b\"", bool));
        }
    };
}

其中, onComplete , onError 和 onNext 都比较易懂, 这里有一个问题就是 DisposableObserver 是什么? Disposable 是什么意思?

DisposableObserver

DisposableObserver 的 在线注释文档 中做出了说明:

DisposableObserver 是一个抽象的 Observer, 它通过实现了 Disposable 接口允许 异步取消 .

现在知道 DisposableObserver 是对被观察者做了什么扩展了, 它支持 异步取消 .

所有 DisposableObserver 预先实现的 final 方法都是线程安全的.

取消 DisposableObserver 需要调用它的 dispose() 方法, 这个方法既可以在 DisposableObserver 之外调用, 也可以在它的 onNext 里面调用, 实现自己取消自己.

我的第一反应就是 DisposableObserver 很适合配合 Fragment 的生命周期释放资源, 这里的确也是这样用的, 这个放在后面部分再说.

建立订阅关系

有了被观察者, 也有了观察者, 下面要做的就是建立订阅关系, 具体的代码为:

_getObservable()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(d);

这里有一个线程切换的操作. 这里我把上一篇中梳理的规则再摆过来, 作为参考:

  • subscribeOn 出现的顺序位置没有关系, 只要指定了 subscribeOn , 那么被观察者就在它指定的线程上运行
  • 在整个链条中 subscribeOn 只调用一次
  • observeOn 可以多次调用, 每调用一次, 链条下游的就切换一次线程

参照着规则看代码, 就理解了.

除此之外, 我的第一印象是, 订阅成功后, 会返回一个订阅对象, 但看代码并没有返回. 这是为什么呢? 通过查阅 文档 , 原来这是 RxJava 2 的一点变化,

订阅 (Subscription) 的变化

在 RxJava 1 中, rx.Subscription 负责流和资源的生命周期管理.

在 RxJava 2 中 Subscription 这个名称改作他用了, Reactive-Streams 规范使用这个名字专门用于表示源于消费者之间的交互点 (interaction point), 类名为 org.reactivestreams.Subscription , 允许请求一个正的数量值并允许取消序列.

既然 Subscription 这个名称被占用了, 在 RxJava 2 中原本的订阅概念改了一个新名字 io.reactivex.Disposable .

看到这里, 就更加理解 DisposableObserver 的意思了, 原来 DisposableObserver 中的那个异步取消, 就是新的取消订阅关系.

变化中还有一点, 是原来的 CompositeSubscription 改名为 CompositeDisposable , 这个东东在下面会用到.

生命周期管理

上一节建立订阅关系后, 点击按钮就能够正常执行异步操作了. 这一节主要讨论生命周期.

上一节建立订阅关系后, 在后面紧跟着的还有一行代码:

_disposables.add(d);

_disposables 这个成员的定义:

private CompositeDisposable _disposables = new CompositeDisposable();

这就是上一节最后提到的.

这样, 当建立一个订阅关系后, 会把 DisposableObserver 添加到 CompositeDisposable 中, 等到 Fragment 生命周期结束时, 一并清掉:

@Override
public void onDestroy() {
    super.onDestroy();
    _disposables.clear();
}

这个 clear 里面的内部实现就是对所有添加进去的 Disposable 挨个执行 dispose 进行取消操作.

从而实现 Fragment 对多个订阅关系进行统一生命周期管理, 在生命周期结束时统一取消的逻辑.

总结

在这一篇中, 完成了对第一个 Demo 的学习.

通过学习, 对很多概念有了更加全面的认识.

这一篇中主要包含了以下重点内容:

  • 建立基本的观察者-被观察者订阅关系
  • 用 RxJava 来执行异步操作, 可以替代 AsyncTask

  • Fragment 结合 CompositeDisposable 进行多个订阅关系的生命周期统一管理

在下一篇中, 我将继续对剩下 Demo 进行代码阅读, 希望有了前两篇的基础, 学习的脚步能够更快一些.

微信公众号

我开通了微信公众号: MaxieeTalk , 欢迎订阅, 及时收到我的最新技术文章~

打赏

如果这篇文章对您有所帮助, 欢迎扫描下面支付宝二维码请我喝杯可乐~

您的打赏将会极大提高我的写作热情, 激励我更新更多更好的博客.

Maxiee 的 RxJava 学习指南 (2)

查看原文: Maxiee 的 RxJava 学习指南 (2)

需要 登录 后回复方可回复, 如果你还没有账号你可以 注册 一个帐号。