Vert.x 用RxJava响应式编程 译<十一>

Java基础

浏览数:68

2020-5-30

TIP:相应的代码在step-8文件夹中(https://github.com/vert-x3/vertx-guide-for-java-devs)

到目前为止,我们已经探讨了Vert.x的多个领域,使用基于APIs的回调方法,这种编程模型在多钟编程语言中可以良好实现。然后,它会变得有的冗长乏味,尤其是处理多个时间源或者多个数据流。

这就是RxJava闪亮的地方,Vert.x可以与之无缝连接。

使用RxJava APIs

除了回调的API,Vert.x模块还提供了一个”Rxified”API,需要我们在maven的pom.xml中加入依赖:

然后我们把Verticles修改一下,需要将原来类中继承的io.vertx.core.AbstractVerticle修改成io.vertx.rxjava.core.AbstractVerticle,这有什么不同呢?前面的类继承了后者并且加入io.vertx.rxjava.core.Vertx属性。

io.vertx.rxjava.core.Vertx定义了额外的rxSomething(…​)方法,等同于callback-based的存在。

让我们看看MainVerticle,来找一找在实践中更好的实现:

rxDeploy方法没有把Handler<AsyncResult<String>>作为一个final的参数,并放回一个Single<String>

以及,这个操作并不是在方法被调用执行,在你订阅这个Single执行,当这个操作完成,它返回部署的 id 或者异常抛出的信息。

依次部署verticles

重构MainVerticle,我们需要确定部署操作触发有序:

    1.flatMap中执行dbVerticleDeployment方法的结果,HttpServerVerticle部署的任务日常调度。

    2.当被订阅的时候执行,成功或者失败,MainVerticle的future会是完成或者失败。

Rxifying” HttpServerVerticle

如果你按序看这个guide,编辑这个代码,那么你的HttpServerVerticle使用的还是基于callback-based的API。在使用RxJava API执行异步的操作之前,你需要先重构HttpServerVerticle

Import RxJava versions of Vert.x classes

    1.我们的backupHandler()方法依旧使用HttpResponse类,因此这是必须导入的。RxJava版本中提供的HttpResponse在某些情况下可以被替换,在step-8文件夹中的没有导入这个类,因为通过lambda表达式推到的原因。

委托“Rxified” vertx实例

当你有一个io.vertx.rxjava.core.Vertx的时候可以调用一个io.vertx.core.Vertx实例,需要调整Verticle’s srart()创建WikiDatabaseService实例的方法:

同时执行授权查询

在前面的例子中,我们看到使用RxJava operators 和 Rxified Vert.x API 来依次执行异步操作。但是有时候这是不需要的,或者你只是想让他们同时运行出于性能的原因。

对于这样的情景,HttpServerVerticle中的JWT 贴可能生成程序是个好的例子。为了创建一个token,我们需要权限查询完成,但是查询是独立的:

    1.创建了三个Single 对象,代表了不同的权限查询

    2.当三个操作完成,zip操作带着results回调。

数据库连接

从连接池中获取一个数据库连接,需要做的事情就是JDBCClient调用rxGetConnection

Single<SQLConnection> connection = dbClient.rxGetConnection();

这个方法返回Single<Connection>,这可以使你容易的sql查询

Single<ResultSet> resultSet = dbClient.rxQueryWithParams(
  sqlQueries.get(SqlQuery.GET_PAGE_BY_ID), new JsonArray().add(id));

当SQLConnection没有用的时候,我们怎么释放这个连接呢?一个简单方便的方式是调用关闭close方法当

Single<SQLConnection>没有被订阅:

    1.当连接被请求的时候,我们把它放入Single对象中

    2.当没有被订阅的时候,Single调用close方法

现在当我们需要的额时候就可以使用getConnection来执行sql查询

为callbacks和RxJava之间搭桥

这个时候,你可能混淆了RxJava 代码和callback-based API,例如service proxy 接口定义为callbacks,但是实现使用了Vert.x Rxified API。

在这个例子中,io.vertx.rx.java.RxHelper可以适配Handler<AsyncResult<T>>到RxJava Subscriber<T>:

1.fetchAllPagesData是一个一个异步的service proxy操作,在Handler<AsyncResult<List<JsonObject>>>调用中定义

2.toSubscriber适配resultHandler到Subscriber<List<JsonObject>>

数据流

RxJava不仅仅在组合不同的时间源做的很好,对于数据流也是非常好的,不像Vert.x或者JDK中的fututre,Observable可以发布事件流,而不是单个,它配备了一套广泛的数据操作,我们可以使用其中的把一部分来重构我们的fetchAllPages database verticle 方法:

    1.利用flatMapObservable我们可以从Single<Result>发起中创建Observable

    2.from方法将数据库results迭代入Observable

    3.因为我们只需要网页名称,可以映射的每个JSONObject到第一列

    4.客户端期望数据是按字母顺序排列

    5.event bus service由一个JsonArray组成,collect用来JsonArray::new创建一个新的,然后添加JsonArray::add

 

 

原文链接:http://vertx.io/docs/guide-for-java-devs/

我的微信公众号:

作者:woshixin