响应式编程是一种异步编程范式,它涉及数据流和变化的传播。这意味着,通过它可以使用你擅长的编程语言轻松表达静态(例如数组)或动态(例如事件发射器SSE)数据流
1、获取数据场景
假设现在有个场景,需要通过爬虫的方式,获得第三方网页的数据,且第三方的数据是分页表格的,我们需要获得前50页的数据。
1.1、阻塞编程
也就是我们常开发的代码。代码可能会这样写。
// 请求50页数据
for(int i = 1 ;i <= 50;i++){
// 带上i分页请求第三方网页
response = Http.get(xxxxx);
// 处理第三方的数据
object = handle(reposne);
}
// 对数据其他处理...
....
画成图就是这个样子
1.1.1、阻塞编程的问题
中途有一个阻塞等待的过程,很多时间线程被浪费在了阻塞上,无法执行其他的处理流程。
假设,第三方网页的响应时间至少是2s,那么我们请求50次,每次需要等到上一次有了response才请求下一次,所以至少需要100s才能获得第三方网页的全部数据。
所以,第一步,先解决获得响应数据阻塞的问题。
1.2、异步编程
要非阻塞,那就来个异步
1.2.1、异步回调
异步回调一般在开发中遇到的很多,比如场景的对接第三方的支付,我们请求数据过后,然后约定一个回调callback地址,对方将支付结果异步再通知到我们自己的系统。
但在上面的爬虫场景下,咋个与第三方约定约定回调callback地址噢。但是可以通过,微服务,我们再单独再开一个服务去解决这个问题。将请求第三方的网页的功能给中间服务去完成,通过中间服务获得网页数据,中间服务再把单页的数据调用服务A的接口,返回给服务A。
现在有了两个服务。
-
服务A循环请求服务B的/getData?page=接口
-
服务B请求第三方网页第i页的数据
-
服务B调用服务A的回调地址/callBack?page=&data=,将第i页的数据给服务A处理
整个流程串起来就是这样的。
通过中间服务B,实现了不管第三方网站有没有响应上一次的请求结果,我们都可以请求下一次。
当服务B收到了第三方网页的响应数据,再直接通过服务A的回调接口将数据回传给服务A。
即使第三方响应时间至少是2s(服务B阻塞等待数据的时间),木桶效应说得好,我们只需要关心这个第三方网页它最慢的响应时间。最慢的响应时间就是我们整个流程走完的时间(不包括服务A处理data)。
回调体现的是一种双向的调用方式,实现两个服务之间的解耦。从而使调用链路不发生阻塞。
1.2.2、异步回调的问题
但是回调的最大问题是复杂,在执行流程包含了多层的异步执行和回调就会形成嵌套结构。也就是回调套回调套回调,代码套娃不容易理解,变成回调地狱。
回调很难大规模的组合起来使用,因为容易导致代码难以理解和维护。(不清楚整个回调链路,谁特么知道这个调用跑到哪里去了..)
1.2.3、Future接收多线程的执行结果
在并发编程中使用的非阻塞模型。用Future接收多线程的执行结果。主线程通过主动轮询future.get(),直到返回结果可用为止。
public static class Task implements Callable<String> {
@Override
public String call() throws Exception {
// 带上i分页请求第三方网页
response = Http.get(xxxxx);
// 处理第三方的数据
return handle(reposne);
}
}
//异步线程的结果Future。
List<Future<Object>> responseFutureList = new ArrayList<Future<Object>>();
ExecutorService es = Executors.newCachedThreadPool();
for(int i=1; i<=50;i++){
//提交50次线程
responseFutureList.add(es.submit(new Task()));
}
for(Future<String> res : responseFutureList){
System.out.println(res.get());
}
涉及多线程就有上下文切换的问题,上下文切换的成本很高。会导致资源利用率不高。
2、响应式编程
异步编程可以解决我们在获得爬取50页数据的阻塞问题。但各有缺陷...这个时候,就可以使用响应式编程了。
2.1、响应式编程的概念
前端用这玩意用的很多。但是响应式编程没必要和具体的编程领域关联,它只是一个普遍适用的概念和编程模型。
本质就是使用异步数据流进行编程,通过库和函数的方式提供一套处理异步数据流的接口规范。
响应式编程两个核心概念:
- 异步。不需要等待处理完成便立刻返回,通过回调将处理结果带回来,提高吞吐量、性能和效率。
- 数据流。按照时间线的时间序列,任何东西都可以是一个数据流,在这个基础上通过函数来组合、过滤和变换这些数据流。通过事件处理器来异步监听数据流。(观察者)
2.2、RxJava
RxJava是“用于组合异步和基于事件的程序的库”,RxJava是反应式编程的具体实现,受到了函数式编程以及数据流编程的影响。
也是我在学习也在使用的...
2.2.1、RxJava是如何运行的
RxJava的核心是Observable类型,给数据或事件的流。
目的是实现推送(反应式),也可以用于拉取(交互式)。
可以同步使用,也可以异步使用,代码随着时间推移产生的0个、1个、多个或者无穷个值或事件。
2.2.2、推送和拉取
RxJava实现反应式的要点在于它支持推送,所以Observable和关联的Observer类型签名支持把事件推送给它。这通常会伴随着异步。
Observable类型还支持一个异步的反馈通道,作为异步系统中的一种流控制或回压方式。
Observable/Observer通过订阅进行链接,Observable代表了数据流,它可以被Observer订阅。
interface Observable<T> {
Subscription subscribe(Observer s)
}
订阅之后,Observe就能够接收三种推送它的事件。
- 通过onNext()函数推送的数据
- 通过onError()函数推送的异常或Throwable
- 通过onCompleted()函数推送的流完成信息
interface Observer<T> {
void onNext(T t)
void onError(Throwable t)
void onCompleted()
}
onNext() 可能永远也不会被调用,也可能会被调用一次、多次或无数次。
onError()和 onCompleted() 是终端事件,这意味着两者只能有一个被调用,并且只能被调用一次。
在终端事件调用之后,Observable流就完成了,以后就不能在向它发送事件了。如果流是无限的,并且没有发生故障,那么终端事件可能永远不会发生。
学习来源:
https://www.zhihu.com/zvideo/1413144562139705344
《RxJava反应式编程》
评论区