Spring Webflux 入门


真是混沌的一个月呀,不管第几周啦,就当做六月的最后一个周吧。

这个周入门 Spring Webflux,归纳学习响应式编程,但只从两个最常用的两个类:Mono 和 Flux 入手,关注一下基本使用。


响应式编程(Reactive programming)是一种不好形容的编程模式,常见的描述语或定义语中会出现:异步、非阻塞、传递变化、支持背压。我觉得自己的理解还不到位,决定本篇不定义或是阐述响应式编程是什么,只入个门,之后回来补这部分功课。

抛开定义,仅从代码的形式上来看,响应式编程跟 JDK 8 中常见的 stream 流,在使用上很接近,例如下面这段响应式代码:

1
2
3
4
5
6
7
Mono.just(jsonParam)
.map(param -> {
log.info("随便打印点东西:{}", param);
return param.get("user");
})
.onErrorResume(Mono::error)
.subscribe();

我在工作中使用的响应式编程框架是 Spring Webflux,它基于 Reactor 3 实现(由 Spring 母公司 Pivotal 的某团队开发),论辈分可能是第四代,或者是第五代。

响应式编程的历史渊源有点复杂,简单来说,最早是微软一帮人在 2010 年写出了 Rx.NET,运行在 .NET 平台上,后来在各个平台上都做了类似的实现,例如 RxJs、RxJava等(Rx 是 ReactiveX 的缩写),其中 RxJava 就是在 Java 平台上的实现。后来出现了 Project Reactor、Akka-Streams 之类的响应式框架。在 2013 年末,Netflix、Pivotal、Typesafe 等公司开始牵头制定响应式流规范,并在 2015 年正式发布了 Reactive Streams 规范,这个规范直接表现在 JDK 9 JUC 包中的 Flow API。如今的响应式框架基本都实现了 Reactive Streams 规范,业界主流的是 RxJava 2 和 Reactor 3。

响应式编程的历史还可以参考这篇文章《Advanced Reactive Java》(偶然搜到的,不知是何方神圣)。

本篇学习的内容是 Reactor 中的 Mono 和 Flux 两个类的使用,我也懒得介绍这两个类了,下面的内容会很干。


Mono

创建

方法 示例 描述 备注
just Mono.just(user)… 把元素加入流中 元素不能是null,否则抛空指针
justOrEmpty Mono.justOrEmpty(user)… 把元素(可能为null)加入到流中 元素如果是null,返回Mono.empty()
Mono.justOrEmpty(optional)… 把元素(Optional对象)加入到流中 元素如果是空,返回Mono.empty()
defer Mono.defer(() -> Mono.error(new RuntimeException(“懒异常”)))… 把Mono元素加入流中,但是延时加载,直到有subscribe时才会加载元素 可以理解为懒加载,当subscribe时才加载元素(每次subscribe都会重新加载)。

例如有一个用法是,当需要定义异常Mono.error(),并不需要每次都创建,而是在出现异常时才加载,可以节省内存

deferWithContext Mono.deferWithContext(context -> Mono.just(“test”)))… 与defer()方法几乎完全相同,只是接受参数变成了Function,可以拿到流的context context来自reactor.util.context包,代表流的上下文,用法不明,待后续
fromCallable return Mono.fromCallable(() -> { Thread.sleep(1000 * 3); return “休眠了三秒”; })… 通过Callable获取元素加入到流中,是支持阻塞的一种实现 有很多类似的fromXXX()方法,例如fromFuture()、fromRunnable()等
empty Mono.empty()… 创建流,但流中没有任何元素 后续如果有flatmap()、map()等,都不会执行,但then()等可以执行,可以用于忽略后续数据
never Mono.never(); 永不停止,不搭理任何数据,就此睡死
error Mono.error(new RuntimeException(“失败”))… 将一个异常加入流中,并立即停止后续操作 参数是Throwable对象,error立即加载,无论是否用到
Mono.error(() -> new RuntimeException(“失败”)) 将一个异常加入流中,并立即停止后续操作 参数是Supplier函数,error懒加载
Mono.delay(Duration.ofSeconds(3))… 上来就延时 延时后返回0

转换

方法 示例 描述 备注
flatMap …flatMap(t -> {
return Mono.just(t); })…
将一个Mono转换成另一个Mono(异步) 参数是一个Function(接收Mono中的值,返回一个新的Mono)
map …map(t -> {
return t; })…
将Mono中的内容进行更改 参数是一个Function(接收Mono中的值,返回另一个值)
cast Mono.just(map) .cast(HashMap.class)… 将Mono中的对象,从A类转换成B类 等价于map(clazz::cast)
sequenceEqual Mono.sequenceEqual(monoA, monoB)… 比较两个Publisher(Mono、Flux)是否相同 返回的是一个Mono<Boolean>
Mono.sequenceEqual(monoA, monoB, (a, b) -> a.length() == b.length())… 比较两个Publisher(Mono、Flux)是否相同(可以指定条件)
zip Mono.zip(mono1, mono2, mono3)… 将多个Mono合并成一个Mono(内部是Tuple对象),可以通过tuple.getT1()等类似方式取出一个个内容 如果其中一个Mono发生异常,逻辑同抛出Mono.error(),如果有多个Mono发生异常,将按顺序取第一个异常
zipDelayError Mono.zipDelayError(mono1, mono2, mono3)… 功能上同zip(),但在处理异常的时候逻辑不同 如果多个Mono发生异常,将组合成一个新的异常

其他

方法 示例 描述 备注
retry Mono.just(10/0).retry(10)… 遇到异常重试 从头执行,而不是只执行上一步
有很多变种,参数总体与异常类型和重试次数有关
repeat Mono.just(“.”).repeat(10)… 重复(在非异常情况下),会从Mono变成重复元素的Flux 有很多变种,并且跟retry一样,都不算最开始的一次

Flux

跟 Mono 相同的就不写了,下面的内容全都是 Flux 独有的

创建

方法 示例 描述 备注
fromIterable Flux.fromIterable(list)… 从一个迭代器对象(实现Iterable接口的对象)中,把所有元素加入Flux中 类似的还有fromArray、fromStream
range Flux.range(20, 5)… 把一系列数字加入Flux中,这些数字间距为1 例如示例中加入Flux中的数字是20、21、22、23、24,从20开始,一共5个

转换

方法 示例 描述 备注
index Flux.fromIterable(list).index()… 给Flux内的多个元素加下标 例如从[“a”,”b”,”c”]变成[{0:”a”},{1:”b”},{2:”c”}],转换后每个T变成一个二元元祖Tuple2<Long, **T**>
startWith .startWith(list)… 将某些内容塞到Flux的最前面 比如原Flux里有[1,2,3],新插入[8,9],然后Flux变成[8,9,1,2,3]
.startWith(mono)… 同上,接收一个Publisher对象
.startWith(t1, t2, t3)… 同上,接收若干个和Flux中其他元素相同的对象
concatWith .concatWith(mono)… 将某些内容塞到Flux的最后面 与startWith的左右相反,但只接收Publisher对象
take Flux.fromIterable(list).take(10)… 只取前面几个元素,后面的舍弃 有很多种变种,例如前N个、某段时间之前、某条件没改变之前等
collect Flux.fromIterable(list).collect(Collectors.toList())… 把Flux中的多个元素封装成一个Mono集合 使用起来跟JDK 8的Stream一个逻辑,例如
list.stream().filter(num -> num > 10).collect(Collectors.toList());
collectList Flux.fromIterable(list).collectList()… 把Flux中的多个元素封装成一个Mono列表 例如从Flux<String>转变成Mono<List<String>>,还有很多类似的collect()方法
window Flux.just(1,2,3,4,5,6,7,8,9).window(3)… 把一个Flux拆成多个Flux,拆后的Flux的元素个数是window内的个数 例如左边示例代码,会把[1,2,3,4,5,6,7,8,9]转换成[1,2,3]、[4,5,6]、[7,8,9]

我发现 Mono 和 Flux 的 Api 实在是太多太多了……我都已经整理三周了,还是没有整理完,不能再拖下去了hhh

本篇文章先写到这里,之后用到哪个新的 Api 就回来接着补充。