侧边栏壁纸
博主头像
敢敢雷博主等级

永言配命,自求多福

  • 累计撰写 57 篇文章
  • 累计创建 0 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

响应式编程学习(一)

敢敢雷
2022-03-15 / 0 评论 / 1 点赞 / 1,658 阅读 / 2,013 字
温馨提示:
部分素材来自网络,若不小心影响到您的利益,请联系我删除。

响应式编程是一种异步编程范式,它涉及数据流和变化的传播。这意味着,通过它可以使用你擅长的编程语言轻松表达静态(例如数组)或动态(例如事件发射器SSE)数据流

1、获取数据场景

假设现在有个场景,需要通过爬虫的方式,获得第三方网页的数据,且第三方的数据是分页表格的,我们需要获得前50页的数据。

1.1、阻塞编程

也就是我们常开发的代码。代码可能会这样写。

// 请求50页数据
for(int i = 1 ;i <= 50;i++){
  // 带上i分页请求第三方网页
  response = Http.get(xxxxx);
  // 处理第三方的数据
  object = handle(reposne);
}
// 对数据其他处理...
....

画成图就是这个样子
image.png

1.1.1、阻塞编程的问题

中途有一个阻塞等待的过程,很多时间线程被浪费在了阻塞上,无法执行其他的处理流程。
image.png

假设,第三方网页的响应时间至少是2s,那么我们请求50次,每次需要等到上一次有了response才请求下一次,所以至少需要100s才能获得第三方网页的全部数据。

所以,第一步,先解决获得响应数据阻塞的问题。

1.2、异步编程

要非阻塞,那就来个异步

1.2.1、异步回调

异步回调一般在开发中遇到的很多,比如场景的对接第三方的支付,我们请求数据过后,然后约定一个回调callback地址,对方将支付结果异步再通知到我们自己的系统。

但在上面的爬虫场景下,咋个与第三方约定约定回调callback地址噢。但是可以通过,微服务,我们再单独再开一个服务去解决这个问题。将请求第三方的网页的功能给中间服务去完成,通过中间服务获得网页数据,中间服务再把单页的数据调用服务A的接口,返回给服务A。

现在有了两个服务。

  1. 服务A循环请求服务B的/getData?page=接口
    image.png

  2. 服务B请求第三方网页第i页的数据
    image.png

  3. 服务B调用服务A的回调地址/callBack?page=&data=,将第i页的数据给服务A处理
    image.png

整个流程串起来就是这样的。
image.png

通过中间服务B,实现了不管第三方网站有没有响应上一次的请求结果,我们都可以请求下一次。

当服务B收到了第三方网页的响应数据,再直接通过服务A的回调接口将数据回传给服务A。

即使第三方响应时间至少是2s(服务B阻塞等待数据的时间),木桶效应说得好,我们只需要关心这个第三方网页它最慢的响应时间。最慢的响应时间就是我们整个流程走完的时间(不包括服务A处理data)。

回调体现的是一种双向的调用方式,实现两个服务之间的解耦。从而使调用链路不发生阻塞。

1.2.2、异步回调的问题

但是回调的最大问题是复杂,在执行流程包含了多层的异步执行和回调就会形成嵌套结构。也就是回调套回调套回调,代码套娃不容易理解,变成回调地狱。

image20220515161048629.png

回调很难大规模的组合起来使用,因为容易导致代码难以理解和维护。(不清楚整个回调链路,谁特么知道这个调用跑到哪里去了..)

1.2.3、Future接收多线程的执行结果

在并发编程中使用的非阻塞模型。用Future接收多线程的执行结果。主线程通过主动轮询future.get(),直到返回结果可用为止。

public static class Task implements Callable<String> {
  @Override
  public String call() throws Exception {
    // 带上i分页请求第三方网页
    response = Http.get(xxxxx);
    // 处理第三方的数据
    return handle(reposne);
  }
}
//异步线程的结果Future。
List<Future<Object>> responseFutureList = new ArrayList<Future<Object>>();
ExecutorService es = Executors.newCachedThreadPool();
for(int i=1; i<=50;i++){
  //提交50次线程
  responseFutureList.add(es.submit(new Task()));
}
for(Future<String> res : responseFutureList){
  System.out.println(res.get());
}

涉及多线程就有上下文切换的问题,上下文切换的成本很高。会导致资源利用率不高。

2、响应式编程

异步编程可以解决我们在获得爬取50页数据的阻塞问题。但各有缺陷...这个时候,就可以使用响应式编程了。

2.1、响应式编程的概念

前端用这玩意用的很多。但是响应式编程没必要和具体的编程领域关联,它只是一个普遍适用的概念和编程模型。

本质就是使用异步数据流进行编程,通过库和函数的方式提供一套处理异步数据流的接口规范。

响应式编程两个核心概念:

  1. 异步。不需要等待处理完成便立刻返回,通过回调将处理结果带回来,提高吞吐量、性能和效率。
  2. 数据流。按照时间线的时间序列,任何东西都可以是一个数据流,在这个基础上通过函数来组合、过滤和变换这些数据流。通过事件处理器来异步监听数据流。(观察者)

2.2、RxJava

RxJava是“用于组合异步和基于事件的程序的库”,RxJava是反应式编程的具体实现,受到了函数式编程以及数据流编程的影响。

也是我在学习也在使用的...

2.2.1、RxJava是如何运行的

RxJava的核心是Observable类型,给数据或事件的流。

目的是实现推送(反应式),也可以用于拉取(交互式)。

可以同步使用,也可以异步使用,代码随着时间推移产生的0个、1个、多个或者无穷个值或事件。

2.2.2、推送和拉取

RxJava实现反应式的要点在于它支持推送,所以Observable和关联的Observer类型签名支持把事件推送给它。这通常会伴随着异步。

Observable类型还支持一个异步的反馈通道,作为异步系统中的一种流控制或回压方式。

Observable/Observer通过订阅进行链接,Observable代表了数据流,它可以被Observer订阅。

interface Observable<T> {
 Subscription subscribe(Observer s)
}

订阅之后,Observe就能够接收三种推送它的事件。

  • 通过onNext()函数推送的数据
  • 通过onError()函数推送的异常或Throwable
  • 通过onCompleted()函数推送的流完成信息
interface Observer<T> {
 void onNext(T t)
 void onError(Throwable t)
 void onCompleted()
}

onNext() 可能永远也不会被调用,也可能会被调用一次、多次或无数次。

onError()和 onCompleted() 是终端事件,这意味着两者只能有一个被调用,并且只能被调用一次。

终端事件调用之后,Observable流就完成了,以后就不能在向它发送事件了。如果流是无限的,并且没有发生故障,那么终端事件可能永远不会发生。

学习来源:

https://www.zhihu.com/zvideo/1413144562139705344

《RxJava反应式编程》

1

评论区