rxKotlin 响应式编程

Java基础

浏览数:210

2019-8-22

前言

rxKotlin :一个在JVM上使用可观测的序列来组成异步的、基于事件的程序的库。

我所理解的 rxKotlin 是一个实现异步操作的库,Android开发过程中将会用到很多异步操作,这种响应式编程的方式能使程序可读性提高,思路清晰,使开发人员能更好地去做代码维护。

为什么推荐RxKotlin以及Kotlin语言

Kotlin是由JetBrains公司最新开发的基于JVM的编程语言,2017 的Google IO 上Kotlin正式成为Android 开发的官方语言。

在我们的日常Android开发过程中,有太多的业务需要用到异步操作,时而开启新线程,时而切回主线程。业务庞大之后,看着自己之前写的业务代码,容易一脸懵逼 +_+ ,如果写了批注估计能好点,要是当时没写批注,估计会瞬间爆炸💥。

先贴一组我分别用java 和 kotlin 写的例子: 读取一组文件夹中png格式的图片,并在UI界面上进行相应的处理操作。
先贴出的是我用java写的..以前我可能还真会这么去写…有时候写着写着代码就直接飞到屏幕外面去了 O(∩_∩)O

  final List<File> folders = new ArrayList<>();
        new Thread(){
            @Override
            public void run() {
                super.run();
                for (File folder : folders) {
                    File[] files = folder.listFiles();
                    for (File file : files) {
                        if (file.getName().endsWith(".png")) {
                            Bitmap bitmap = BitmapFactory.decodeFile(file.getAbsolutePath());
                            runOnUiThread(new Runnable() {
                                @Override
                                public void run() {
                                    // 处理 bitmap
                                }
                            });
                        }
                    }
                }
            }
        }.start();

接下来是用rxKotlin写的相同功能的代码

    Observable.from(folders)
                .flatMap {
                    Observable.from(it.listFiles())
                }
                .filter {
                    it.name.endsWith(".png")
                }
                .map {
                    BitmapFactory.decodeFile(it.absolutePath)
                }
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe {
                    // 处理 bitmap
                }

肿么样?是不是感觉瞬间逻辑上就变得清晰了起来?(主要是逼格一下子就上去了🙃)
这就是我为什么推崇kotlin和响应式编程的理由。可能一个人学会一门技能,他可以靠着这项技能吃一辈子的饭,但是,如果他不断更新或加强这项技能,也许他很快就能吃上肉吧。

原理解析

1. 核心:观察者模式

该库的异步实现,是通过扩展的观察者模式来实现的。
什么是观察者模式?用一个通俗点的例子说明。

按钮点击事件的观察者模式

这里,Button作为一个被观察的对象,观察者OnClickListenner 监听其点击事件。 当按钮被点击时,触发一个 OnClick的消息事件,然后传递给 OnClickListener。如果用观察者模式的方式去定义此次点击事件的话,我会这么说: 观察者 OnClickListener 通过setOnClickListener()去订阅了 被观察者Button ,一旦Button被点击,变回触发事件 OnClick
换言之,就有了如下的对应关系:
Button —> 被观察者
OnclickListener —> 观察者
setOnClickListener() —> 订阅
OnClick() —> 事件

2. 实现方式

哦差点忘了,本文中的链式操作如map, filter 之类的api在这里我就不一一介绍了,感兴趣的话可以去找一些rxJava的文章看看它们的介绍与用法,一抹多的文章都介绍了它们的基本用法。

1)创建 observer

创建观察者observer,它将决定事件到来的时候做出什么样的动作。

   var observer = object : Observer<String> {
           override fun onNext(t: String?) {
               Log.e("name",t)
           }

           override fun onError(e: Throwable?) {
               Log.e("error",e.toString())
           }

           override fun onCompleted() {
               Log.e("status","completed!!")
           }

       }

它里面有三个会掉方法,可以自行定义当事件到来时,执行什么样的响应操作。随着每一个事件到来,正常的话会在回调onNext()中响应。若所有事件都执行完毕,则会调用onCompleted()

2) 创建Observable

Observable被观察者,它将决定触发什么事件以及事件触发的规则。

  var observable = Observable.create<String> {
            it.onStart()
            it.onNext("jiangyu")
            it.onNext("jy")
            it.onNext("god")
            it.onCompleted()
        }

这里,我创建了三个事件,依次是发送”jiangyu”,”jy”,”god”,这三个字符串,然后调用onCompleted()作为事件发送完毕标记。

3)Subscribe 订阅

有了观察者和被观察者,使用subscribe订阅让二者连接起来。
即:

observable.subscribe(observer)

发现没有,这里逻辑上是反的,可能会和思维上有些出入。明显这里的逻辑变成了被观察者去订阅观察者,为什么要和人们的惯性思维背道而驰呢?下面来讲解一下这流式API的工作方式

3. API工作原理

首先放上一张刚才的程序运行截图

运行截图

仔细回想一下刚才所创建的观察者和被观察者,看看他们都做了些什么事。我总结性的概括一下 :1.首先被观察者定义了发生的事件以及事件发生的顺序(就是这里的发送字符串的顺序)2.每一个事件到来时,观察者对到来的事件进行处理(这里为打印日志)

以上代码和逻辑分析又可以写成这样一个等价的代码段(除了没有重写onCompleted),更能方便对于整个过程的理解:

   Observable
                .just("jiangyu", "jy", "god")
                .subscribe {
                    Log.e("name",it)
                }

这是一种API提供的更加简洁的写法,网上对于这种流式操作有着各种理解,有人说这就像是一个发射器,将事件一个一个的发射然后处理。

为了更进一步展示工作原理,我对代码做了如下的变形:

 Observable
                .just("jiangyu", "jy", "god","000")
                .filter {
                    Log.e("start with j", it)
                    it.startsWith("j")
                }
                .map {
                    Log.e("name", it)
                    it.toUpperCase()
                }
                .subscribe {
                    Log.e("after map name", it.toString())
                }

测试结果

结果表明,不是说所有元素执行完filter 过滤再执行map映射的,意思是流上的事件或元素都沿着链垂直移动。这正是印证了之前所说的,这就好比发射器,将事件一个一个发射出去并依次处理。

线程控制

终于,讲了辣磨多观察者模式的原理,现在正是进入正题!一起来研究rxKotlin响应式编程在Andorid中的使用方式。在Android开发过程中,最重要的无非就是线程间的切换。因此,掌握Scheduler调度器(线程控制器)的工作方式和使用方法,我认为大概率就可以在Android开发过程中使用rxKotlin掌控雷电了⚡️

先介绍几个常用的API自带的Scheduler:

  • Schedulers.newThread(): 总是启用新线程并执行操作
  • Schedulers.io(): 主要用于读写文件、网络请求等,功能上和newThread()差不多,但是他是用了无上限线程池,并能够复用空闲的线程。
  • AndroidSchedulers.mainThread(): 指定在主线程运行

那我们怎么来对线程进行控制呢?先来看如下代码:

 @GET("getUsers")
    fun getUsers(@Query("token") token: String):Array<User>

 val retrofit = Retrofit
                .Builder()
                .client(OkHttpClient())
                .addConverterFactory(GsonConverterFactory.create())
                .build()
        val userService = retrofit.create(UserService::class.java)

        Observable.from(userService.getUsers("token"))// io 线程
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .filter {
                    it.age < 18
                }
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe {
                    // 在UI主线程上操作
                }

这里结合了使用Retrofit网络框架,举了一个线程切换的栗子,可能这个栗子有点烂….主要操作就是先获取用户信息,然后过滤得到年龄小于18岁的用户,最后在UI上完成操作。
我将subsribeOnobserveOn的作用域简单的标注了一下:

作用域

简单总结一下的话,可以分为如下三点:

  • subscribeOn 控制其之前的语句所处线程
  • observeOn 控制其之后语句,到下一个observeOn之前的语句所处线程
  • observeOn 之后如果再添加subscribeOn会没有任何作用

线程间的切换游刃有余,是什么样的强大原理支撑这样的操作呢?让我们来慢慢研究。首先来普及并解释一下我认为rxKotlin中比较关键的操作 – 变换

map & flatMap

前面的代码中其实我已经用到很多了,个人呢认为这两兄弟在rxKotlin中是相当重要的。

map:
这里呢我们可以认为他相当于映射,也有那么点意思在里面,但是也不完全是吧。我认为更好的解释是 对于事件对象的变化操作吧,即由一个事件对象转变为另一个事件对象。

 Observable
                .just(user1, user2, user3)
                .map { 
                    it.name
                }
                .subscribe {
                    // print  这里打印的是每个user的名字
                }

这很好理解,我传入三个user,对于每一个user我通过map来返回他们对应的名字,然后再打印出来。

flatMap:
这个的概念比较难去理解,我把它理解为”铺平”操作。何谓”铺平”操作呢?再来看一个栗子🌰

// User的数据结构
data class User(val name: String, val age: Int, val courses : Array<Course>)

 Observable
                .from(arrayOf(user1,user2,user3))
                .flatMap {
                    Observable.from(it.courses)
                }
                .filter {
                    it.classRoom.equals("528")
                }

首先还是将三个user作为输入并依次处理,对于每一个user,在flatMap操作中取出它的courses,此时并不发送它们,而是激活。接着再初始化一个新的Observable,相当于将每一个user对应的courses这一堆课程再次分发下去。 很难理解……用心体会😄

为了更好的去理解这些操作的原理,先来看看map的API:

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }

map中传入的是一个function(将T 转换为 R),再通过lift() 方法返回一个Observable<R> 。没错,传入的是一个函数,这属于高阶函数,即传入的参数或返回的参数是一个函数。
然后我们再来看看lift()的API:

 public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                try {
                    Subscriber<? super T> st = hook.onLift(operator).call(o);
                    try {
                        // new Subscriber created and being subscribed with so 'onStart' it
                        st.onStart();
                        onSubscribe.call(st);
                    } catch (Throwable e) {
                        // localized capture of errors rather than it skipping all operators 
                        // and ending up in the try/catch of the subscribe method which then
                        // prevents onErrorResumeNext and other similar approaches to error handling
                        Exceptions.throwIfFatal(e);
                        st.onError(e);
                    }
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    // if the lift function failed all we can do is pass the error to the final Subscriber
                    // as we don't have the operator available to us
                    o.onError(e);
                }
            }
        });
    }

很有意思的是他在返回的Observable<R>中有一个OnSubscribe<R>的一个监听,是对该Observable<R>subscibe的一个监听,只有当发生该事件的subscribe时,才会触发上述代码中call()方法里面的操作。仔细回想一下之前我们提到的,为什么事件的处理顺序是一个垂直的方向进行的,这就很好的解释了这个疑惑点。像map这样的中间操作过程,都相当于是一个模板,它会定义一个事件会进行怎样的变换操作,但是却不会立刻执行,只有等到其监听到subscribe()方法时才会触发变换。

线程切换的原理也大同小异,同样是用到这个核心的lift()方法,具体可以自己去参见API中的源码。

总结

研究Kotlin语言 和 rxKotlin、rxJava 我也处于一个起步的阶段,也算是发现了这些新鲜东西的一些亮点之处吧,自己研究了一番然后把研究出来的一些东西总结与分享了出来。希望可以在以后的项目和学习过程中更深入地去理解它的原理。

By the way , 不管是看到第一段就直接跳到这儿的,还是中途睡着的无意间手抖滑到这儿的…
↙️↙️↙️左下角 ❤️ 谢谢
😄😄😄😄😄😄😄😄

作者:JYGod丶