1. 响应式编程基础
1.1. 什么是响应式编程?
响应式编程是一种编程范式,专注于数据流和变化传播。它的核心思想是:
-
数据流:将数据看作流动的序列(Stream),可以是有限的(如列表)或无限的(如实时事件流)。
-
异步非阻塞:通过异步方式处理数据流,避免阻塞线程,提高资源利用率。
-
变化传播:当数据流中的某个数据发生变化时,自动通知依赖它的组件进行更新。
响应式编程的目标是构建高效、弹性、可扩展的系统,特别适合处理高并发、实时数据处理的场景。
1.2. 阻塞式与非阻塞式的区别
阻塞式(Blocking)
-
特点:
-
线程在执行任务时会一直等待,直到任务完成。
-
例如,传统的
Servlet
模型在处理请求时,线程会一直阻塞,直到 I/O 操作(如数据库查询、网络请求)完成。
-
-
缺点:
-
线程资源浪费,无法高效处理高并发请求。
-
每个请求都需要一个独立的线程,线程数量受限于操作系统。
-
非阻塞式(Non-blocking)
-
特点:
-
线程不会等待任务完成,而是继续执行其他任务。
-
例如,在响应式编程中,I/O 操作会立即返回一个
Mono
或Flux
,线程可以继续处理其他任务,等到 I/O 操作完成后再通知线程处理结果。
-
-
优点:
-
线程资源利用率高,适合高并发场景。
-
少量线程即可处理大量请求。
-
1.3. Reactive Streams 标准
Reactive Streams 是一个规范,定义了响应式编程的核心接口和规则,目的是实现异步数据流的标准化。它的核心组件包括:
-
Publisher(发布者):
-
数据流的源头,负责生成数据。
-
例如,
Flux
和Mono
都是Publisher
的实现。
-
-
Subscriber(订阅者):
-
数据流的消费者,负责处理数据。
-
通过
onNext
、onError
、onComplete
方法接收数据流的事件。
-
-
Subscription(订阅):
-
连接
Publisher
和Subscriber
的桥梁。 -
通过
request(n)
方法控制数据流的请求量(背压机制)。
-
-
Backpressure(背压):
-
一种流量控制机制,用于解决生产者和消费者速度不匹配的问题。
-
消费者通过
Subscription
告诉生产者需要多少数据,避免数据积压。
-
1.4. Reactive Streams 与 Spring WebFlux 的关系
Spring WebFlux 是 Spring 5 引入的响应式 Web 框架,它的核心设计基于 Reactive Streams 标准。以下是它们之间的关系:
Spring WebFlux 的响应式模型
-
基于 Reactive Streams:
-
Spring WebFlux 使用 Reactor 库(实现了 Reactive Streams 标准)作为其响应式编程的核心。
-
Reactor 提供了
Mono
和Flux
两种数据类型,分别表示单值流和多值流。
-
-
非阻塞 I/O:
-
Spring WebFlux 使用非阻塞 I/O 模型,能够高效处理高并发请求。
-
例如,WebFlux 的
WebClient
是一个非阻塞的 HTTP 客户端,基于 Reactive Streams 实现。
-
WebFlux 的核心组件
-
Handler 和 Router:
-
WebFlux 提供了两种编程模型:基于注解的控制器和函数式端点。
-
无论是哪种模型,底层都依赖于 Reactive Streams 的
Publisher
和Subscriber
。
-
-
WebClient:
-
WebFlux 的
WebClient
是一个非阻塞的 HTTP 客户端,基于 Reactive Streams 实现。 -
它返回
Mono
或Flux
,支持背压机制。
-
背压机制的应用
-
WebFlux 的背压支持:
-
WebFlux 在处理数据流时,会自动应用背压机制,避免生产者过快导致消费者无法处理。
-
例如,当客户端请求大量数据时,WebFlux 会根据消费者的处理能力动态调整数据流的发送速度。
-
1.5. 响应式编程的应用场景
响应式编程特别适合以下场景:
-
高并发请求:
-
例如,Web 服务器需要同时处理成千上万的请求。
-
-
实时数据处理:
-
例如,实时监控系统、股票行情推送、聊天应用。
-
-
异步任务处理:
-
例如,批量文件处理、异步消息队列。
-
-
流式数据处理:
-
例如,处理大文件或无限数据流(如日志流)。
-
1.6. 响应式编程与传统编程模型的对比
-
传统编程模型(阻塞式):
-
同步阻塞:线程会一直等待任务完成。
-
回调地狱:嵌套的回调函数导致代码难以维护。
-
资源浪费:线程在等待时无法处理其他任务。
-
-
响应式编程模型(非阻塞式):
-
异步非阻塞:线程不会等待任务完成,而是继续处理其他任务。
-
声明式编程:通过操作符(如
map
、flatMap
)描述数据流的处理逻辑,代码更简洁。 -
资源高效:线程资源利用率高,适合高并发场景。
-
示例对比:
-
传统编程(阻塞式):
String result = blockingHttpClient.get("https://example.com"); // 阻塞线程 System.out.println(result);
-
响应式编程(非阻塞式):
Mono<String> result = webClient.get().uri("https://example.com").retrieve().bodyToMono(String.class); // 非阻塞 result.subscribe(System.out::println); // 异步处理结果
1.7. Reactor 的调试工具
Reactor 提供了丰富的调试工具,帮助开发者理解和调试响应式流:
-
log()
操作符:在数据流的每个阶段打印日志,方便观察数据流的执行过程。
Flux.just("apple", "banana", "cherry").log() // 打印日志.map(String::toUpperCase).subscribe();
输出:
INFO - onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
INFO - request(unbounded)
INFO - onNext(apple)
INFO - onNext(banana)
INFO - onNext(cherry)
INFO - onComplete()
-
Hooks.onOperatorDebug()
:启用调试模式,打印更详细的堆栈信息。
Hooks.onOperatorDebug();
Flux.just(1, 2, 3).map(n -> n / 0) // 模拟错误.subscribe();
-
StepVerifier
:用于测试响应式流的行为。
StepVerifier.create(Flux.just(1, 2, 3)).expectNext(1, 2, 3).verifyComplete();
2. WebFlux 核心功能
2.1 WebFlux 注解模型
Spring WebFlux 提供了基于注解的编程模型,类似于 Spring MVC,但支持响应式编程。以下是 WebFlux 注解模型的详细讲解:
2.1.1. 注解驱动开发
WebFlux 的注解模型与 Spring MVC 类似,但支持响应式类型(如 Mono
和 Flux
)。以下是核心注解及其用法:
核心注解
-
@RestController
:-
用于标记一个类为控制器,处理 HTTP 请求。
-
返回的数据会自动序列化为 JSON 或 XML。
-
@RestController
public class MyController {@GetMapping("/hello")public Mono<String> hello() {return Mono.just("Hello, WebFlux!");}
}
-
@RequestMapping
:-
用于映射 HTTP 请求到控制器方法。
-
可以指定路径、请求方法、请求头等。
-
@RestController
@RequestMapping("/api")
public class MyController {@RequestMapping(value = "/hello", method = RequestMethod.GET)public Mono<String> hello() {return Mono.just("Hello, WebFlux!");}
}
HTTP 方法注解
-
@GetMapping
:-
处理 HTTP GET 请求。
-
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable String id) {return userService.findUserById(id);
}
-
@PostMapping
:-
处理 HTTP POST 请求。
-
@PostMapping("/users")
public Mono<User> createUser(@RequestBody User user) {return userService.saveUser(user);
}
-
@PutMapping
、@DeleteMapping
、@PatchMapping
:-
分别用于处理 HTTP PUT、DELETE 和 PATCH 请求。
-
参数绑定注解
-
@PathVariable
:-
用于从 URL 路径中提取变量。
-
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable String id) {return userService.findUserById(id);
}
-
@RequestParam
:-
用于从查询参数中提取值。
-
@GetMapping("/users")
public Flux<User> getUsers(@RequestParam String name) {return userService.findUsersByName(name);
}
-
@RequestBody
:-
用于将请求体绑定到方法参数。
-
@PostMapping("/users")
public Mono<User> createUser(@RequestBody User user) {return userService.saveUser(user);
}
-
@ModelAttribute
:-
用于将表单数据绑定到对象。
-
@PostMapping("/users")
public Mono<User> createUser(@ModelAttribute User user) {return userService.saveUser(user);
}
-
@SessionAttribute
:-
用于从会话中提取属性。
-
@GetMapping("/profile")
public Mono<User> getProfile(@SessionAttribute User user) {return Mono.just(user);
}
请求头和 Cookie 注解
-
@RequestHeader
:-
用于从请求头中提取值。
-
@GetMapping("/hello")
public Mono<String> hello(@RequestHeader("User-Agent") String userAgent) {return Mono.just("User-Agent: " + userAgent);
}
-
@CookieValue
:-
用于从 Cookie 中提取值。
-
@GetMapping("/hello")
public Mono<String> hello(@CookieValue("JSESSIONID") String sessionId) {return Mono.just("Session ID: " + sessionId);
}
使用 ServerWebExchange
获取完整的请求上下文
-
ServerWebExchange
:-
提供了对 HTTP 请求和响应的完全访问。
-
可以获取请求头、Cookie、会话等信息。
-
@GetMapping("/info")
public Mono<String> getInfo(ServerWebExchange exchange) {String path = exchange.getRequest().getPath().value();return Mono.just("Request Path: " + path);
}
2.1.2. 控制器方法的返回类型
Mono<T>
和 Flux<T>
-
Mono<T>
:-
表示一个异步的单值流。
-
适用于返回单个结果的场景。
-
@GetMapping("/user/{id}")
public Mono<User> getUser(@PathVariable String id) {return userService.findUserById(id);
}
-
Flux<T>
:-
表示一个异步的多值流。
-
适用于返回多个结果的场景。
-
@GetMapping("/users")
public Flux<User> getUsers() {return userService.findAllUsers();
}
与传统阻塞式返回类型的对比
-
传统阻塞式返回类型(如
String
):-
线程会阻塞,直到任务完成。
-
@GetMapping("/hello")
public String hello() {return "Hello, World!";
}
-
响应式返回类型(如
Mono<String>
):-
线程不会阻塞,可以继续处理其他任务。
-
@GetMapping("/hello")
public Mono<String> hello() {return Mono.just("Hello, WebFlux!");
}
2.1.3. 异常处理
自定义全局异常处理:@ControllerAdvice
-
@ControllerAdvice
:-
用于定义全局异常处理逻辑。
-
@ControllerAdvice
public class GlobalExceptionHandler {@ExceptionHandler(UserNotFoundException.class)public Mono<ResponseEntity<String>> handleUserNotFoundException(UserNotFoundException ex) {return Mono.just(ResponseEntity.status(HttpStatus.NOT_FOUND).body(ex.getMessage()));}
}
异步异常捕获
-
在响应式编程中,异常可以通过
onErrorResume
、onErrorReturn
等方法捕获和处理。
@GetMapping("/user/{id}")
public Mono<User> getUser(@PathVariable String id) {return userService.findUserById(id).onErrorResume(ex -> Mono.just(new User("default", "Default User")));
}
2.2 WebFlux 函数式模型
ServerRequest
:
表示 HTTP 请求,提供了访问请求头、参数、正文等方法。
HandlerFunction<ServerResponse> helloHandler = request -> {String name = request.queryParam("name").orElse("World");return ServerResponse.ok().bodyValue("Hello, " + name + "!"); };
ServerResponse
:
表示 HTTP 响应,提供了构建响应状态码、头信息、正文等方法。
HandlerFunction<ServerResponse> helloHandler = request -> ServerResponse.ok().bodyValue("Hello, WebFlux!");
2.1.1 HandlerFunction
定义:
-
HandlerFunction
是一个函数式接口,用于处理 HTTP 请求并生成响应。 -
它的核心方法是
Mono<ServerResponse> handle(ServerRequest request)
。
作用:
-
实现具体的业务逻辑。
-
接收
ServerRequest
,返回ServerResponse
。
示例:
HandlerFunction<ServerResponse> helloHandler = request -> ServerResponse.ok().bodyValue("Hello, WebFlux!");
这里的意思是创建了个
HandlerFunction
类型的变量 helloHandler,这个helloHandler是怎么生成的,是通过接受一个request作为参数,并且通过ServerResponse返回Mono<ServerResponse>。
2.2.1 RouterFunction
定义:
-
RouterFunction
是一个函数式接口,用于定义路由规则。 -
它的核心方法是
Mono<HandlerFunction> route(ServerRequest request)
。
作用:
-
将 HTTP 请求映射到对应的
HandlerFunction
。 -
根据请求的路径、方法等条件,选择合适的
HandlerFunction
来处理请求。
RouterFunctions.route()
-
定义:
-
RouterFunctions.route
是一个静态方法,用于创建RouterFunction
。
-
-
参数:
-
第一个参数是请求谓词(如
GET
、POST
)。 -
第二个参数是
HandlerFunction
。
-
RouterFunction<ServerResponse> route = RouterFunctions.route(GET("/hello"), request -> ServerResponse.ok().bodyValue("Hello, WebFlux!")
);
这里的意思也是创建了个RouterFunction类型的变量route,调用了它自身的静态方法RouterFunctions.route(将/hello请求路由到这里),接受一个request作为参数,并且返回一个Mono<HandlerFunction>,这里第二行的 request -> ServerResponse.ok().bodyValue("Hello, WebFlux!")一般是用一个
HandlerFunction
类的对象来替代。
2.2.3. HandlerFunction
和 RouterFunction
的关系
核心关系
-
RouterFunction
是路由的入口:-
RouterFunction
负责根据请求的路径、方法等条件,选择合适的HandlerFunction
来处理请求。 -
它类似于一个路由器,将请求分发到对应的处理器。
-
-
HandlerFunction
是业务逻辑的实现:-
HandlerFunction
负责处理具体的业务逻辑,并生成响应。 -
它类似于一个控制器方法,专注于处理请求并返回结果。
-
工作流程
-
请求到达:
-
当一个 HTTP 请求到达时,WebFlux 会调用
RouterFunction
的route
方法。
-
-
路由匹配:
-
RouterFunction
根据请求的路径、方法等条件,匹配对应的HandlerFunction
。
-
-
处理请求:
-
如果匹配成功,
RouterFunction
返回对应的HandlerFunction
。 -
WebFlux 调用
HandlerFunction
的handle
方法处理请求。
-
-
生成响应:
-
HandlerFunction
处理请求并生成ServerResponse
。 -
WebFlux 将
ServerResponse
转换为 HTTP 响应并返回给客户端。
-
示例代码
以下是一个完整的示例,展示了 RouterFunction
和 HandlerFunction
的协同工作:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.*;@Configuration
public class WebConfig {// 定义 HandlerFunctionHandlerFunction<ServerResponse> helloHandler = request -> ServerResponse.ok().bodyValue("Hello, WebFlux!");HandlerFunction<ServerResponse> goodbyeHandler = request -> ServerResponse.ok().bodyValue("Goodbye, WebFlux!");// 定义 RouterFunction@Beanpublic RouterFunction<ServerResponse> router() {return RouterFunctions.route(GET("/hello"), helloHandler).andRoute(GET("/goodbye"), goodbyeHandler);}
}
-
RouterFunction
:-
定义了两条路由规则:
-
/hello
请求由helloHandler
处理。 -
/goodbye
请求由goodbyeHandler
处理。
-
-
-
HandlerFunction
:-
helloHandler
返回 "Hello, WebFlux!"。 -
goodbyeHandler
返回 "Goodbye, WebFlux!"。
-
2.3.4 RouterFunctions.nest()
实现路由嵌套
路由嵌套的作用
-
将一组相关的路由规则嵌套在一个公共路径下,提高代码的可读性和可维护性。
-
例如,将所有
/api
开头的请求嵌套在一起。
示例代码
@Bean
public RouterFunction<ServerResponse> nestedRouter() {return RouterFunctions.nest(path("/api"), // 公共路径RouterFunctions.route(GET("/users"), request -> ServerResponse.ok().bodyValue("User List")).andRoute(GET("/products"), request -> ServerResponse.ok().bodyValue("Product List")));
}
-
RouterFunctions.nest
:-
将
/users
和/products
路由嵌套在/api
路径下。 -
最终的路由路径为:
-
/api/users
-
/api/products
-
-
2.3.5 HandlerFilterFunction
实现请求拦截和过滤
HandlerFilterFunction
的作用
-
在请求到达
HandlerFunction
之前或之后执行逻辑。 -
常见的应用场景包括:
-
身份验证
-
日志记录
-
异常处理
-
示例代码
// 定义一个日志过滤器
HandlerFilterFunction<ServerResponse, ServerResponse> logFilter = (request, next) -> {System.out.println("Request Path: " + request.path());return next.handle(request);
};// 应用过滤器
@Bean
public RouterFunction<ServerResponse> filteredRouter() {return RouterFunctions.route(GET("/hello"), request -> ServerResponse.ok().bodyValue("Hello, WebFlux!")).filter(logFilter); // 添加过滤器
}
-
logFilter
:-
在请求到达
HandlerFunction
之前,打印请求路径。 -
调用
next.handle(request)
继续处理请求。
-
3. WebFlux 的数据处理
单值流(Mono)和多值流(Flux)
Mono<T>
表示异步序列中最多只包含一个元素(或一个错误)的流。
适合处理如单个对象查询、单个文件上传结果等场景。Mono<String> mono = Mono.just("Hello, Mono!"); mono.subscribe(System.out::println);
Flux<T>
表示异步序列中零个或多个元素(或一个错误)的流。
适合处理如列表查询、批量处理、流式数据处理等场景。Flux<Integer> flux = Flux.just(1, 2, 3, 4); flux.subscribe(System.out::println);
总结表格:最常用操作
操作 Mono 示例 Flux 示例 创建 Mono.just("Hello")
Flux.just("A", "B")
转换 mono.map(s -> s.length())
flux.map(s -> s.toUpperCase())
异步转换 mono.flatMap(id -> findUser(id))
flux.flatMap(s -> queryAsync(s))
过滤 - flux.filter(s -> s.startsWith("A"))
错误处理 mono.onErrorReturn("default")
flux.onErrorResume(e -> backupFlux)
组合流 Mono.zip(mono1, mono2)
flux1.mergeWith(flux2)
3.1 Mono 操作
3.1.1. 创建 Mono
(1) Mono.just(T data)
-
作用:包装一个已知的值。
-
场景:当数据已经存在时。
Mono<String> mono = Mono.just("Hello"); // 直接包含"Hello"
(2) Mono.empty()
-
作用:创建一个不发射任何数据的
Mono
。 -
场景:表示一个空操作(如删除数据后无需返回内容)。
Mono<Void> deleteOperation = Mono.empty(); // 常用于删除操作
(3) Mono.error(Throwable error)
-
作用:创建一个立即抛出错误的
Mono
。 -
场景:快速失败(如参数校验失败)。
Mono<String> errorMono = Mono.error(new RuntimeException("Bad Request"));
3.1.2. 转换 Mono
(1) map(Function<T, R> mapper)
-
作用:同步转换数据。
-
本质:直接对数据做计算,不涉及异步。
Mono<Integer> lengthMono = Mono.just("Hello").map(s -> s.length()); // 5
(2) flatMap(Function<T, Mono<R>> mapper)
-
作用:异步转换数据,返回新的
Mono
。 -
本质:用于链式调用另一个异步操作(如数据库查询)。
Mono<User> userMono = userIdMono.flatMap(id -> userRepository.findById(id));
(3) then(Mono<V> other)
-
作用:忽略当前结果,执行下一个
Mono
。 -
场景:顺序执行不依赖前一个结果的操作。
Mono<Void> logOperation = Mono.just("Processed").then(Mono.fromRunnable(() -> System.out.println("Done")));
3.1.3. 错误处理
(1) onErrorReturn(T fallbackValue)
-
作用:遇到错误时返回默认值。
-
场景:快速降级。
Mono<Integer> safeNumber = Mono.just("abc").map(Integer::parseInt).onErrorReturn(0); // 解析失败返回0
(2) onErrorResume(Function<Throwable, Mono<T>> fallback)
-
作用:捕获错误并返回另一个
Mono
。 -
场景:复杂降级逻辑(如调用备用服务)。
Mono<User> user = userRepository.findById(id).onErrorResume(e -> backupUserRepository.findById(id));
3.2 Flux 操作
3.2.1. 创建 Flux
(1) Flux.just(T... data)
-
作用:创建包含多个元素的流。
Flux<String> flux = Flux.just("A", "B", "C"); // 发射三个字符串
(2) Flux.fromIterable(Iterable<T> iterable)
-
作用:从集合(如
List
)创建流。 -
场景:将已有数据转换为流。
List<String> list = Arrays.asList("A", "B");
Flux<String> flux = Flux.fromIterable(list); // 发射"A", "B"
(3) Flux.range(int start, int count)
-
作用:生成整数序列。
Flux<Integer> numbers = Flux.range(1, 3); // 发射1, 2, 3
3.2.2. 转换 Flux
(1) map(Function<T, R> mapper)
-
作用:同步转换每个元素。
Flux<Integer> lengths = Flux.just("apple", "banana").map(String::length); // 5, 6
(2) flatMap(Function<T, Publisher<R>> mapper)
-
作用:异步转换每个元素为新的流,并合并所有流。
-
场景:并行处理元素(如为每个ID查询数据库)。
Flux<String> results = Flux.just(1, 2, 3).flatMap(id -> userRepository.findById(id)); // 异步查询每个用户
3.2.3. 过滤和选择
(1) filter(Predicate<T> predicate)
-
作用:过滤不符合条件的元素。
Flux<Integer> evenNumbers = Flux.range(1, 5).filter(n -> n % 2 == 0); // 2, 4
(2) take(long n)
-
作用:取前
n
个元素。 -
场景:分页查询或限制数据量。
Flux<Integer> firstTwo = Flux.range(1, 10).take(2); // 1, 2
3.2.4. 组合 Flux
(1) mergeWith(Publisher<T> other)
-
作用:合并两个流,元素按实际到达顺序交错发射。
-
场景:合并实时数据流(如多个传感器数据)。
Flux<String> flux1 = Flux.just("A", "B");
Flux<String> flux2 = Flux.just("C", "D");
Flux<String> merged = flux1.mergeWith(flux2); // 可能输出 A, C, B, D
(2) zipWith(Publisher<T2> other, BiFunction<T, T2, R> combinator)
-
作用:将两个流的元素按位置一一合并。
-
场景:组合关联数据(如用户名和年龄)。
Flux<String> names = Flux.just("John", "Jane");
Flux<Integer> ages = Flux.just(30, 25);
Flux<String> zipped = names.zipWith(ages, (name, age) -> name + ":" + age);
// 输出 John:30, Jane:25
3.2.5. 错误处理
(1) onErrorReturn(T fallbackValue)
-
作用:遇到错误时返回默认值并终止流。
Flux<Integer> safeNumbers = Flux.just("1", "abc", "3").map(Integer::parseInt).onErrorReturn(-1); // 输出1, -1
(2) onErrorResume(Function<Throwable, Publisher<T>> fallback)
-
作用:捕获错误并切换到备用流。
Flux<Integer> numbers = Flux.just(1, 2, 0).map(n -> 10 / n).onErrorResume(e -> Flux.just(-1, -2)); // 输出1, 5, -1, -2
3.3 核心理解要点
3.3.1 基础知识
1. Mono vs Flux
-
Mono
:代表 0或1个元素 的流(如查询单个用户)。 -
Flux
:代表 0到N个元素 的流(如查询用户列表)。
2. 同步 vs 异步
-
map
:同步操作(立即执行,不切换线程)。 -
flatMap
:异步操作(可能切换线程,返回新的流)。
3. 链式调用
所有操作符可以像流水线一样链式调用:
Flux.just("apple", "banana").filter(s -> s.length() > 5).map(String::toUpperCase).subscribe(System.out::println); // 输出 "BANANA"
3.3.2 订阅 subscribe()
1. 什么是 subscribe()
?
-
定义:
subscribe()
是 订阅数据流 的方法,调用后数据流开始执行。 -
核心作用:触发数据流的执行,并定义如何处理数据、错误和完成信号。
2. 为什么需要 subscribe()
?
-
在响应式编程中,数据流是 惰性 的,只有调用
subscribe()
时,数据流才会开始执行。 -
如果没有调用
subscribe()
,数据流只是一段定义好的逻辑,不会实际运行。
3.subscribe()
的常用重载方法
Mono
和 Flux
的 subscribe()
方法有多个重载版本,以下是常用的两种:
(1) 无参数版本
-
作用:只订阅数据流,不处理任何数据或事件。
Flux.just("A", "B", "C").subscribe();
-
解释:数据流会执行,但不会对数据做任何处理。
(2) 带 Consumer<T>
参数
-
作用:订阅并处理每个数据项。
Flux.just("A", "B", "C").subscribe(data -> System.out.println("Received: " + data));
(3)带 Consumer<T>
和 Consumer<Throwable>
参数
-
作用:订阅并处理数据项和错误。
Flux.just("A", "B", "C").concatWith(Flux.error(new RuntimeException("Error"))).subscribe(data -> System.out.println("Received: " + data),error -> System.err.println("Error: " + error.getMessage()));
(4)带 Consumer<T>
、Consumer<Throwable>
和 Runnable
参数
-
作用:订阅并处理数据项、错误和完成信号。
Flux.just("A", "B", "C").subscribe(data -> System.out.println("Received: " + data),error -> System.err.println("Error: " + error.getMessage()),() -> System.out.println("Completed!"));
(5)使用 BaseSubscriber
自定义订阅者
-
作用:完全控制订阅逻辑,包括
request
的数量,自定义背压逻辑。
Flux.range(1, 1000).subscribe(new BaseSubscriber<Integer>() {@Overrideprotected void hookOnSubscribe(Subscription subscription) {// 初始请求10个数据request(10);}@Overrideprotected void hookOnNext(Integer value) {System.out.println("Received: " + value);// 每处理完一个数据,再请求1个request(1);}@Overrideprotected void hookOnComplete() {System.out.println("Completed!");}@Overrideprotected void hookOnError(Throwable throwable) {System.err.println("Error: " + throwable.getMessage());}});
-
解释:
-
hookOnSubscribe()
中调用request(10)
,表示初始请求 10 个数据。 -
hookOnNext()
中调用request(1)
,表示每处理完一个数据,再请求 1 个。
-
实际应用场景:分页查询
-
场景:从数据库分页查询数据,每次请求固定数量的数据。
Flux<User> users = userRepository.findAll().limitRate(100); // 每次查询100条数据users.subscribe(user -> System.out.println("User: " + user));
3.3.3 背压(Backpressure)
1. 什么是背压?
-
定义:背压是 数据流中生产者(Publisher)和消费者(Subscriber)之间的流量控制机制。
-
问题背景:
-
如果生产者生产数据的速度 > 消费者处理数据的速度,会导致消费者积压大量未处理的数据,最终可能内存溢出(OOM)。
-
背压的作用是让消费者告诉生产者:“我处理不过来了,慢一点!”
-
2. 背压是自动的吗?
在 Reactor(Spring WebFlux 的底层库)中,背压是自动支持的。
-
当你使用
Flux
或Mono
时,背压机制已经内置。 -
消费者通过
Subscription.request(n)
告诉生产者需要多少数据。
Flux.range(1, 1000) // 生产者生成1到1000的数字.subscribe(new BaseSubscriber<Integer>() {@Overrideprotected void hookOnSubscribe(Subscription subscription) {// 每次请求10个数据request(10);}@Overrideprotected void hookOnNext(Integer value) {System.out.println("Received: " + value);// 每处理完一个数据,再请求1个request(1);}});
-
这里消费者通过
request(n)
控制数据流的速度。 -
一般subscribe方法内部都是不用自己重写的,只有在需要自定义的情况下才需要自己定义request(n),不然一般都是直接subscribe()。
3. 背压策略
(1)缓冲(Buffer):将未处理的数据缓存起来(默认策略)(onBackpressureBuffer())
-
Flux.range(1, 1000).onBackpressureBuffer(100) // 设置缓冲区大小为100.subscribe(n -> {Thread.sleep(10); // 模拟处理速度慢System.out.println(n);});
(2)丢弃(Drop):直接丢弃无法处理的数据(onBackpressureDrop())
-
Flux.range(1, 1000).onBackpressureBuffer(100) // 设置缓冲区大小为100.subscribe(n -> {Thread.sleep(10); // 模拟处理速度慢System.out.println(n);});
(3)最新值(Latest):只保留最新的数据,丢弃旧数据(onBackpressureLatest())
-
Flux.range(1, 1000).onBackpressureLatest() // 只保留最新数据.subscribe(n -> {Thread.sleep(10); // 模拟处理速度慢System.out.println(n);});
3.3.4 冷流和热流
1.冷流(Cold Stream)——像 DVD 播放器
冷流的特点
-
每次订阅都从头开始:就像每次按播放键,DVD 都会从开头播放。
-
数据独立:每个订阅者看到的数据是完整的、独立的。
实际场景:数据库查询:
Flux<User> users = userRepository.findAll(); // 冷流// 第一次查询
users.subscribe(user -> System.out.println("用户1:" + user));// 第二次查询(重新执行SQL)
users.subscribe(user -> System.out.println("用户2:" + user));
-
两次查询会执行两次 SQL,数据互不影响。
2.热流(Hot Stream)——像电视台直播
热流的特点
-
数据实时广播:不管有没有人订阅,数据都在实时生成。
-
共享数据源:所有订阅者看到的是同一份实时数据。
实际场景:股票价格推送:
// 创建一个热流(每秒推送一个随机价格)
ConnectableFlux<Double> stockPrice = Flux.interval(Duration.ofSeconds(1)).map(i -> Math.random() * 100).publish(); // 转换为热流stockPrice.connect(); // 开始推送数据(即使没有订阅者)// 订阅者1(从当前时间点接收数据)
stockPrice.subscribe(price -> System.out.println("订阅者1:" + price));Thread.sleep(3000);// 订阅者2(3秒后加入,只能收到之后的数据)
stockPrice.subscribe(price -> System.out.println("订阅者2:" + price));
3.如何选择冷流 vs 热流?
场景 | 选择 | 原因 |
---|---|---|
需要重复查询数据库 | 冷流 | 每次查询都要获取最新数据 |
实时聊天消息 | 热流 | 所有用户共享同一份实时消息 |
文件下载 | 冷流 | 每个下载请求独立处理 |
传感器数据监控 | 热流 | 数据持续生成,多个监控端共享 |
3.3.5 自动订阅
定义
-
自动订阅 是指框架(如 Spring WebFlux)在特定条件下自动调用
subscribe()
方法,触发数据流的执行。 -
核心思想:开发者只需要定义数据流(如
Flux
或Mono
),框架会在合适的时机自动订阅并处理数据。
为什么需要自动订阅?
-
简化代码:开发者不需要手动调用
subscribe()
,减少样板代码。 -
统一管理:框架可以更好地管理数据流的生命周期(如错误处理、背压控制等)。
Spring WebFlux 中的自动订阅
(1) HTTP 请求处理
-
场景:当客户端发起 HTTP 请求时,Spring WebFlux 会自动订阅返回的
Flux
或Mono
。
@RestController
public class UserController {@GetMapping("/users")public Flux<User> getUsers() {return userRepository.findAll(); // 返回一个 Flux<User>}
}
-
解释:
-
当客户端访问
/users
时,Spring WebFlux 会自动调用subscribe()
,触发userRepository.findAll()
的执行。 -
数据流的结果会通过 HTTP 响应返回给客户端。
-
(2) WebSocket 通信
-
场景:在 WebSocket 通信中,Spring WebFlux 会自动订阅数据流并推送给客户端。
@RestController
public class WebSocketController {@MessageMapping("chat")public Flux<String> chat(Flux<String> messages) {return messages.map(msg -> "Echo: " + msg); // 返回一个 Flux<String>}
}
-
解释:
-
当客户端通过 WebSocket 发送消息时,Spring WebFlux 会自动订阅
messages
数据流。 -
处理后的消息会通过 WebSocket 推送给客户端。
-
(3) 响应式数据库查询
-
场景:在使用响应式数据库(如 R2DBC)时,Spring WebFlux 会自动订阅查询结果。
@Service
public class UserService {@Autowiredprivate UserRepository userRepository;public Flux<User> getAllUsers() {return userRepository.findAll(); // 返回一个 Flux<User>}
}
-
解释:
-
当调用
getAllUsers()
时,Spring WebFlux 会自动订阅userRepository.findAll()
。 -
查询结果会通过数据流返回。
-
自动订阅的底层原理
(1) Spring WebFlux 的工作流程
-
步骤 1:开发者定义数据流(如
Flux
或Mono
)。 -
步骤 2:Spring WebFlux 在接收到请求时,自动调用
subscribe()
。 -
步骤 3:数据流开始执行,结果通过 HTTP 响应或 WebSocket 返回。
(2) 自动订阅的触发条件
-
HTTP 请求:当控制器方法返回
Flux
或Mono
时。 -
WebSocket 通信:当
@MessageMapping
方法返回Flux
或Mono
时。 -
响应式数据库查询:当响应式仓库方法返回
Flux
或Mono
时。
3.3.6 自定义 Netty 线程池
配置 EventLoopGroup 线程数
通过自定义 HttpServerResources
,可以调整 EventLoopGroup
的线程数。
代码示例
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.server.HttpServer;
import reactor.netty.resources.LoopResources;
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
import org.springframework.boot.web.server.WebServerFactoryCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;@Configuration
public class NettyConfig {@Beanpublic WebServerFactoryCustomizer<NettyReactiveWebServerFactory> nettyWebServerFactoryCustomizer() {return factory -> factory.addServerCustomizers(httpServer -> {// 自定义 EventLoopGroup 线程池LoopResources loopResources = LoopResources.create("custom-loop", 4, 8, true);// 自定义 Worker 线程池(处理业务逻辑)ThreadFactory workerThreadFactory = new ThreadFactory() {private final AtomicInteger counter = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "worker-thread-" + counter.incrementAndGet());}};return httpServer.protocol(HttpProtocol.HTTP11).runOn(loopResources) // 指定 EventLoopGroup.workerTaskExecutor(java.util.concurrent.Executors.newFixedThreadPool(16, workerThreadFactory));});}
}
参数说明
-
LoopResources.create("custom-loop", 4, 8, true)
:-
4
:Boss Group 线程数(接收连接)。 -
8
:Worker Group 线程数(处理 I/O)。 -
true
:是否使用守护线程。
-
-
workerTaskExecutor
:自定义业务逻辑线程池,适合处理阻塞操作(如数据库访问)。
线程池选择策略
-
纯非阻塞操作:使用默认的
EventLoop
线程(无需额外配置)。 -
混合阻塞操作:使用
Schedulers.boundedElastic()
或自定义线程池隔离阻塞任务。
处理阻塞任务示例
import reactor.core.scheduler.Schedulers;public Mono<String> blockingOperation() {return Mono.fromCallable(() -> {// 模拟阻塞操作(如 JDBC 查询)Thread.sleep(1000);return "Blocking Result";}).subscribeOn(Schedulers.boundedElastic()); // 使用弹性线程池
}
4. Spring MVC转换成Spring WebFlux
第一步:修改依赖
把 Spring MVC 的依赖换成 WebFlux:
<!-- 删除这个 -->
<!-- <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency>
</dependencies>--><!-- 移除 spring-boot-starter-web,替换为 webflux --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><!-- 响应式数据库驱动(例如 R2DBC) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-r2dbc</artifactId></dependency><dependency><groupId>io.r2dbc</groupId><artifactId>r2dbc-postgresql</artifactId></dependency>
第二步: 修改 Controller层
-
Spring MVC:直接返回
User
或ResponseEntity<User>
。 -
WebFlux:返回
Mono<ResponseEntity<User>>
或Flux<ResponseEntity<User>>
。
Spring MVC 示例(阻塞式):
@GetMapping("/user/{id}")
public User getUser(@PathVariable String id) {return userService.findById(id); // 阻塞调用
}
Spring WebFlux 示例(非阻塞式):
@GetMapping("/user/{id}")
public Mono<User> getUser(@PathVariable String id) {return userService.findById(id); // 返回 Mono<User>
}
关键修改点:
-
所有返回值必须包装为
Mono<T>
或Flux<T>
。 -
ResponseEntity
需要包裹在Mono
中:Mono<ResponseEntity<User>>
。 -
使用
map
转换结果,defaultIfEmpty
处理空值。
第三步:修改 Service 层
-
Spring MVC:方法返回
User
(阻塞操作)。 -
WebFlux:方法返回
Mono<User>
或Flux<User>
(非阻塞操作)。
阻塞式服务示例:
public User findById(String id) {return userRepository.findById(id); // 阻塞的 JPA 调用
}
非阻塞式服务示例:
public Mono<User> findById(String id) {return userRepository.findById(id); // 假设已使用响应式 Repository
}
第四步:修改数据库访问
-
如果用的是 MySQL/PostgreSQL → 换成 R2DBC(响应式驱动)。
-
如果用的是 MongoDB → 直接用 响应式 MongoDB。
Spring MVC(阻塞式)
使用 JPA 的阻塞式接口:
public interface UserRepository extends JpaRepository<User, String> {
}
Spring WebFlux(非阻塞式)
使用 R2DBC 或响应式 MongoDB:
// R2DBC(关系型数据库)
public interface ReactiveUserRepository extends ReactiveCrudRepository<User, String> {
}// 响应式 MongoDB
public interface ReactiveUserRepository extends ReactiveMongoRepository<User, String> {
}
第五步:处理阻塞代码
1. 处理返回 Mono
的阻塞代码
场景:调用一个阻塞的同步方法(如 JDBC 查询、传统 HTTP 客户端),返回单个结果。
关键操作:
-
Mono.fromCallable()
:将阻塞操作包装为Mono
。 -
subscribeOn(Schedulers.boundedElastic())
:指定阻塞操作在弹性线程池中执行,,避免占用事件循环线程。
示例:
public Mono<User> findUserBlocking(String id) {return Mono.fromCallable(() -> {// 这是一个阻塞的数据库查询(如 JDBC)return jdbcTemplate.queryForObject("SELECT * FROM users WHERE id = ?", User.class, id);}).subscribeOn(Schedulers.boundedElastic());
}
2. 处理返回 Flux
的阻塞代码
场景:调用一个阻塞的同步方法(如遍历文件内容、批量数据库查询),返回多个结果。
关键操作:
-
Flux.generate()
或Flux.create()
:生成数据流。 -
flatMapSequential
或concatMap
:保持数据顺序。 -
subscribeOn(Schedulers.boundedElastic())
确保文件读取操作在弹性线程池中执行。
示例:遍历大文件并逐行处理
public Flux<String> readLargeFileBlocking(String filePath) {return Flux.generate(() -> Files.lines(Paths.get(filePath)).iterator(), (iterator, sink) -> {if (iterator.hasNext()) {sink.next(iterator.next());} else {sink.complete();}return iterator;}).subscribeOn(Schedulers.boundedElastic());
}
3. 通用原则
无论是 Mono
还是 Flux
,处理阻塞代码的核心原则是相同的:
(1) 隔离阻塞操作
-
使用
Schedulers.boundedElastic()
:-
专为阻塞操作设计的线程池,动态管理线程数量。
-
避免使用
Schedulers.parallel()
(用于 CPU 密集型任务)。
-
(2) 限制并发
-
控制线程池大小:避免无限制创建线程。
// 自定义线程池(可选) Scheduler blockingScheduler = Schedulers.newBoundedElastic(10, // 最大线程数100, // 任务队列容量"blocking-pool" );Mono.fromCallable(() -> blockingCall()).subscribeOn(blockingScheduler);
(3) 保持响应式链的纯净
-
避免在
map
/flatMap
中直接写阻塞代码:// 错误示例:阻塞代码在事件循环中执行! Flux.range(1, 10).map(i -> {Thread.sleep(1000); // 阻塞操作return i * 2;});// 正确示例:隔离阻塞操作 Flux.range(1, 10).flatMap(i -> Mono.fromCallable(() -> {Thread.sleep(1000); // 阻塞操作return i * 2;}).subscribeOn(Schedulers.boundedElastic()));
第六步:错误处理对比
Spring MVC(阻塞式)
使用 try-catch
或 @ExceptionHandler
:
@ExceptionHandler(Exception.class)
public ResponseEntity<String> handleError(Exception e) {return ResponseEntity.status(500).body("Error: " + e.getMessage());
}
Spring WebFlux(非阻塞式)
使用 Reactor 的错误处理操作符:
@GetMapping("/user/{id}")
public Mono<ResponseEntity<User>> getUser(@PathVariable String id) {return userService.findById(id).map(user -> ResponseEntity.ok(user)).onErrorResume(e -> Mono.just(ResponseEntity.status(500).build())); // 异常时返回500
}
常用操作符:
-
onErrorReturn(T fallback)
:直接返回默认值。 -
onErrorResume(Function<Throwable, Mono<T>> fallback)
:返回另一个Mono
。 -
onErrorMap(Function<Throwable, Throwable> mapper)
:转换异常类型。
第七步:集合数据处理对比
Spring MVC(阻塞式)
返回 List<User>
:
@GetMapping("/users")
public List<User> getAllUsers() {return userService.findAll(); // 阻塞操作
}
Spring WebFlux(非阻塞式)
返回 Flux<User>
:
@GetMapping("/users")
public Flux<User> getAllUsers() {return userService.findAll(); // 非阻塞操作
}
流式响应示例:
@GetMapping(value = "/users-stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> getUsersStream() {return userService.findAll().delayElements(Duration.ofSeconds(1)); // 每秒发送一个用户
}
第八步:性能优化与注意事项
背压(Backpressure)管理
-
默认策略:Reactor 会自动处理背压,但可以手动控制:
Flux.range(1, 1000).onBackpressureBuffer(50) // 缓冲区大小为50.subscribe();
-
策略选择:
-
onBackpressureDrop()
:丢弃无法处理的数据。 -
onBackpressureLatest()
:只保留最新数据。
-
线程池配置
-
非阻塞操作:使用默认的
Schedulers.parallel()
。 -
阻塞操作:使用
Schedulers.boundedElastic()
。
第九步:测试代码对比
Spring MVC(阻塞式)
使用 MockMvc
:
@SpringBootTest
@AutoConfigureMockMvc
class UserControllerTest {@Autowiredprivate MockMvc mockMvc;@Testvoid testGetUser() throws Exception {mockMvc.perform(get("/user/1")).andExpect(status().isOk()).andExpect(jsonPath("$.name").value("Alice"));}
}
Spring WebFlux(非阻塞式)
使用 WebTestClient
:
@SpringBootTest
class UserControllerTest {@Autowiredprivate WebTestClient webTestClient;@Testvoid testGetUser() {webTestClient.get().uri("/user/1").exchange().expectStatus().isOk().expectBody().jsonPath("$.name").isEqualTo("Alice");}
}
5. WebClient 在 Spring WebFlux 中的使用
5.1. WebClient 的核心作用
WebClient 是 Spring WebFlux 提供的 非阻塞式 HTTP 客户端,用于替代传统的 RestTemplate
。
-
非阻塞特性:基于 Reactor Netty,所有操作通过事件循环(Event Loop)异步执行,无需阻塞线程等待响应。
-
响应式流支持:直接返回
Mono
(单个结果)或Flux
(多个结果),与 Spring WebFlux 的响应式编程模型无缝集成。 -
背压支持:自动控制数据流速率,避免生产者压垮消费者。
为什么需要从 RestTemplate 迁移到 WebClient?
-
传统问题(Spring MVC + RestTemplate):
-
阻塞式调用:每个 HTTP 请求占用一个线程,线程池资源有限(如 Tomcat 默认 200 线程),高并发时容易耗尽线程。
-
同步处理:代码必须等待 HTTP 响应返回后才能继续执行,无法高效利用 CPU 和网络资源。
-
-
解决方案(Spring WebFlux + WebClient):
-
非阻塞 I/O:基于 Reactor Netty,使用事件循环模型,少量线程处理大量并发请求。
-
响应式流整合:直接返回
Mono
/Flux
,与响应式服务无缝衔接。 -
资源高效:无需为每个请求分配线程,适合微服务和云原生架构。
-
5.2. 创建与配置 WebClient
(1) 基础配置
// 默认配置(无基础 URL)
WebClient webClient = WebClient.create();// 自定义配置(推荐)
WebClient webClient = WebClient.builder().baseUrl("https://api.example.com") // 设置基础 URL(后续请求可省略域名).defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) // 默认请求头.filter((request, next) -> { // 全局过滤器(如日志、认证)System.out.println("Request URL: " + request.url());return next.exchange(request);}).build();
详细说明:
-
baseUrl
:所有请求的前缀,例如/users
实际访问https://api.example.com/users
。 -
defaultHeader
:为所有请求添加默认请求头(如设置Content-Type
)。 -
filter
:拦截请求和响应,可用于日志记录、添加认证令牌等。
(2) 配置超时和连接池
HttpClient httpClient = HttpClient.create().responseTimeout(Duration.ofSeconds(5)) // 响应超时时间.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000); // 连接超时时间(3秒)WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)) // 绑定 HttpClient.build();
参数说明:
-
responseTimeout
:等待服务器响应的最大时间,超时后抛出ReadTimeoutException
。 -
CONNECT_TIMEOUT_MILLIS
:建立 TCP 连接的超时时间。
5.3. 发起 HTTP 请求
(1) GET 请求:获取单个资源
Mono<User> userMono = webClient.get().uri("/users/{id}", 1) // 路径参数.retrieve() // 发起请求并获取响应.bodyToMono(User.class); // 将响应体解析为 User 对象// 调用示例
userMono.subscribe(user -> System.out.println("User: " + user),error -> System.err.println("Error: " + error)
);
详细说明:
-
uri("/users/{id}", 1)
:路径参数占位符,{id}
会被替换为1
。 -
retrieve()
:执行请求并获取响应。 -
bodyToMono(User.class)
:将响应体解析为User
对象,返回Mono<User>
。
(2) GET 请求:获取资源集合
Flux<User> usersFlux = webClient.get().uri("/users").retrieve().bodyToFlux(User.class); // 解析为 Flux<User>// 调用示例
usersFlux.subscribe(user -> System.out.println("Received user: " + user),error -> System.err.println("Error: " + error),() -> System.out.println("All users received")
);
关键点:
-
bodyToFlux
:用于解析多个对象的场景(如返回 JSON 数组)。
(3) POST 请求:提交数据
User newUser = new User("Alice");
Mono<User> createdUser = webClient.post().uri("/users").bodyValue(newUser) // 设置请求体.retrieve().bodyToMono(User.class); // 解析响应体// 调用示例
createdUser.subscribe(user -> System.out.println("Created user: " + user),error -> System.err.println("Error: " + error)
);
详细说明:
-
bodyValue(newUser)
:将对象序列化为 JSON 并作为请求体发送。 -
默认使用 Jackson 库进行 JSON 序列化/反序列化。
(4) PUT 请求:更新资源
User updatedUser = new User("Alice Updated");
Mono<Void> updateResult = webClient.put().uri("/users/{id}", 1).bodyValue(updatedUser).retrieve().bodyToMono(Void.class); // 无响应体时使用updateResult.subscribe(() -> System.out.println("User updated"),error -> System.err.println("Error: " + error)
);
(5) DELETE 请求:删除资源
Mono<Void> deleteResult = webClient.delete().uri("/users/{id}", 1).retrieve().bodyToMono(Void.class);deleteResult.subscribe(() -> System.out.println("User deleted"),error -> System.err.println("Error: " + error)
);
(6)如何传递请求头?
使用 .headers()
方法添加自定义头:
webClient.get().uri("/users/{userId}", userId).header("X-Auth-Token", "my-token").retrieve().bodyToMono(User.class);
5.4. 处理响应与错误
(1) 处理 HTTP 状态码
Mono<User> userMono = webClient.get().uri("/users/{id}", 1).exchangeToMono(response -> { // 手动处理完整响应if (response.statusCode().is2xxSuccessful()) {return response.bodyToMono(User.class);} else if (response.statusCode() == HttpStatus.NOT_FOUND) {return Mono.error(new UserNotFoundException("User not found"));} else {return Mono.error(new RuntimeException("Server error"));}});
关键点:
-
exchangeToMono
:获取完整的响应对象(包含状态码、头信息),适合需要精细控制的场景。 -
onStatus()
简化错误处理:webClient.get().uri("/users/{id}", 1).retrieve().onStatus(HttpStatusCode::is4xxClientError, response ->Mono.error(new ClientErrorException())).onStatus(HttpStatusCode::is5xxServerError, response ->Mono.error(new ServerErrorException())).bodyToMono(User.class);
(2) 重试机制
webClient.get().uri("/users/{id}", 1).retrieve().bodyToMono(User.class).retryWhen(Retry.backoff(3, Duration.ofSeconds(1))); // 指数退避重试
参数说明:
-
maxAttempts(3)
:最多重试 3 次。 -
backoff(Duration.ofSeconds(1))
:每次重试间隔递增(1s, 2s, 4s)。
(3) 超时控制
webClient.get().uri("/users/{id}", 1).retrieve().bodyToMono(User.class).timeout(Duration.ofSeconds(5)) // 设置超时时间.onErrorResume(TimeoutException.class, e ->Mono.error(new ServiceUnavailableException()));
5.5. 复杂场景处理
(1) 文件上传
MultipartBodyBuilder builder = new MultipartBodyBuilder();
builder.part("file", new FileSystemResource("test.txt")); // 添加文件
builder.part("name", "My File"); // 添加表单字段Mono<String> result = webClient.post().uri("/upload").body(BodyInserters.fromMultipartData(builder.build())) // 构建 Multipart 请求.retrieve().bodyToMono(String.class);
(2) 流式响应处理(Server-Sent Events)
Flux<String> eventStream = webClient.get().uri("/events").accept(MediaType.TEXT_EVENT_STREAM) // 声明接收 SSE 流.retrieve().bodyToFlux(String.class);eventStream.subscribe(event -> System.out.println("Received event: " + event),error -> System.err.println("Error: " + error),() -> System.out.println("Stream completed")
);
5.6. 异步流整合
(1) 链式调用多个请求
Mono<Order> orderMono = webClient.get().uri("/users/{id}", 1).retrieve().bodyToMono(User.class).flatMap(user -> webClient.post().uri("/orders").bodyValue(new Order(user.getId())).retrieve().bodyToMono(Order.class));
关键点:
-
flatMap
:将前一个Mono
的结果传递给下一个异步操作。
(2) 并行调用多个请求
Mono<User> userMono = webClient.get().uri("/users/1").retrieve().bodyToMono(User.class);
Mono<Order> orderMono = webClient.get().uri("/orders/1").retrieve().bodyToMono(Order.class);// 合并结果
Mono<UserOrder> userOrderMono = Mono.zip(userMono, orderMono).map(tuple -> new UserOrder(tuple.getT1(), tuple.getT2()));
说明:
-
Mono.zip
:并行执行多个请求,所有结果就绪后合并。
5.7. 总结
步骤 | Spring MVC (RestTemplate) | Spring WebFlux (WebClient) |
---|---|---|
发起请求 | restTemplate.getForObject() | webClient.get().retrieve().bodyToMono() |
错误处理 | try-catch | .onStatus() + .onErrorResume() |
超时控制 | 手动设置线程超时 | .timeout(Duration) |
文件上传 | MultiValueMap + postForEntity() | MultipartBodyBuilder + BodyInserters |
流式处理 | 不支持 | 支持 Server-Sent Events (SSE) |
6. Spring WebFlux 文件上传与下载详细指南
6.1. 文件上传
6.1.1 添加依赖
确保项目中包含 spring-boot-starter-webflux
:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
6.1.2 配置上传限制
在 application.yml
中配置最大文件大小和请求大小:
spring:webflux:multipart:max-file-size: 10MB # 单个文件最大大小max-request-size: 20MB # 整个请求最大大小
6.1.3 接收单个文件上传
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.codec.multipart.FilePart;
import reactor.core.publisher.Mono;@PostMapping("/upload")
public Mono<String> uploadFile(@RequestPart("file") FilePart filePart) {// 获取文件名(需防范路径遍历攻击)String fileName = filePart.filename().replaceAll("[^a-zA-Z0-9.-]", "_");Path path = Paths.get("uploads", fileName);// 将文件内容写入磁盘(非阻塞方式)return DataBufferUtils.write(filePart.content(), path, StandardOpenOption.CREATE).then(Mono.just("File uploaded: " + fileName));
}
关键点:
-
@RequestPart("file")
:绑定上传的文件参数。 -
文件名清洗:替换非法字符,防止路径遍历攻击。
-
DataBufferUtils.write
:非阻塞写入文件。
单个文件上传:
DataBufferUtils.write
参数详解方法签名
public static Mono<Void> write(Publisher<DataBuffer> source, // 输入的数据流(通常是 FilePart.content())Path destination, // 目标文件路径OpenOption... options // 文件打开选项(如 CREATE、WRITE) )
参数解释
source: Publisher<DataBuffer>
数据来源,通常为
FilePart.content()
,表示上传文件的二进制数据流。
FilePart
是 Spring 对上传文件的抽象,content()
方法返回Flux<DataBuffer>
。
destination: Path
文件保存路径,需使用
Paths.get
创建。示例:
Path path = Paths.get("uploads", fileName);
options: OpenOption...
控制文件写入方式的选项,常用值:
StandardOpenOption.CREATE
:如果文件不存在则创建。
StandardOpenOption.TRUNCATE_EXISTING
:如果文件存在则清空。
StandardOpenOption.WRITE
:允许写入。
6.1.4 接收多个文件上传
@PostMapping("/upload-multi")
public Mono<String> uploadMultipleFiles(@RequestPart("files") List<FilePart> fileParts) {return Flux.fromIterable(fileParts).flatMap(filePart -> {String fileName = sanitizeFilename(filePart.filename());Path path = Paths.get("uploads", fileName);return DataBufferUtils.write(filePart.content(), path, StandardOpenOption.CREATE);}).then(Mono.just("All files uploaded"));
}private String sanitizeFilename(String filename) {return filename.replaceAll("[^a-zA-Z0-9.-]", "_");
}
6.1.5 处理大文件(分块上传与进度监控)
分块上传代码
@PostMapping(value = "/upload-large", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public Mono<String> uploadLargeFile(@RequestBody Flux<PartEvent> partEvents) {return partEvents// 按文件分块(直到遇到最后一个块).windowUntil(PartEvent::isLast) // 按顺序处理每个块.concatMap(partEventFlux -> partEventFlux// 过滤出文件数据块.ofType(FilePartEvent.class)// 提取数据内容.map(FilePartEvent::content)// 合并所有 DataBuffer 为单个缓冲区.as(content -> DataBufferUtils.join(content)))// 处理合并后的数据块.flatMap(dataBuffer -> {return Mono.fromRunnable(() -> {try (InputStream is = dataBuffer.asInputStream(true)) {// 将数据追加写入文件Files.copy(is, Paths.get("uploads/large-file.dat"), StandardCopyOption.REPLACE_EXISTING);} catch (IOException e) {throw new RuntimeException(e);} finally {// 释放缓冲区内存DataBufferUtils.release(dataBuffer);}});}).then(Mono.just("Large file uploaded"));
}
关键概念
PartEvent
:Spring 5.3+ 新增的 API,表示多部分请求中的每个事件(如文件头、数据块、请求结束)。windowUntil
:将数据流按条件分块(此处按isLast
判断是否结束)。concatMap
:保证块按顺序处理。DataBufferUtils.join
:将多个DataBuffer
合并为一个。
这里的后端代码是基于 Spring WebFlux 的流式分块接收(PartEvent
),其设计初衷是配合支持 HTTP/2 流式上传的前端框架(如浏览器中的 Fetch API + Streams API)。但微信小程序的网络请求 API 存在以下限制:
-
不支持流式上传:小程序
wx.request
和wx.uploadFile
API 只能发送完整请求体,无法直接发送Flux<DataBuffer>
流。 -
无法触发
PartEvent
分块逻辑:后端windowUntil(PartEvent::isLast)
依赖前端按流式分块发送数据,而小程序只能发送完整的 multipart/form-data 请求。
问题点 | 原 Spring WebFlux 接口 | 适配小程序方案 |
---|---|---|
分块触发方式 | 依赖流式 PartEvent 自动分块 | 前端手动切割文件并多次调用上传接口 |
请求类型 | 单一请求持续发送流式数据 | 多个独立请求 |
后端存储逻辑 | 流式写入同一文件 | 按分块编号存储临时文件 |
断点续传支持 | 需自行实现 | 通过记录已上传分块实现 |
添加进度监控
AtomicLong bytesWritten = new AtomicLong(0);.flatMap(dataBuffer -> {long currentBytes = dataBuffer.readableByteCount();bytesWritten.addAndGet(currentBytes);System.out.println("Progress: " + bytesWritten.get() + " bytes");// ...原有写入逻辑...
})
6.2. 文件下载
6.2.1 下载静态文件
@GetMapping("/download/{filename:.+}")
public Mono<Void> downloadFile(@PathVariable String filename, ServerHttpResponse response
) {// 1. 构建文件路径Path filePath = Paths.get("uploads", filename);Resource resource = new UrlResource(filePath.toUri());// 2. 检查文件是否存在if (!resource.exists() || !resource.isReadable()) {return Mono.error(new FileNotFoundException("File not found"));}// 3. 设置响应头response.getHeaders().setContentType(MediaType.APPLICATION_OCTET_STREAM);response.getHeaders().setContentDisposition(ContentDisposition.attachment().filename(filename).build());// 4. 零拷贝写入响应ZeroCopyHttpOutputMessage zeroCopyResponse = (ZeroCopyHttpOutputMessage) response;return zeroCopyResponse.writeWith(resource, 0, resource.contentLength());
}
浏览器行为:
-
Content-Type: application/octet-stream
:告诉浏览器这是一个二进制文件,无法直接预览。 -
Content-Disposition: attachment; filename="file.txt"
:强制浏览器弹出下载对话框。 -
这是 HTTP 协议的标准行为,所有遵循协议的客户端(包括 Postman)都会根据这些头处理响应。
非浏览器客户端(如微信小程序):
-
微信小程序不会自动触发下载,需要开发者手动处理:
-
小程序前端通过
wx.downloadFile
API 下载文件。 -
开发者需将文件 URL 返回给前端,前端根据业务逻辑自行处理文件(如保存到本地、预览等)。
-
-
关键点:无论客户端类型,设置这些头是服务端的责任,但实际下载行为由客户端实现。
示例代码(小程序端):
wx.downloadFile({url: 'https://your-api.com/download/file.txt',success(res) {wx.saveFile({tempFilePath: res.tempFilePath,success(savedRes) {console.log('文件保存成功:', savedRes.savedFilePath);}});}
});
为什么下载可以使用零拷贝(从云存储中下载不行),而上传不行?
-
下载场景:
-
零拷贝机制:下载时,文件内容需要从磁盘读取并通过网络发送到客户端。零拷贝技术(如 Linux 的
sendfile
)允许数据直接从磁盘文件描述符传输到网络套接字,绕过用户空间的内存复制,显著减少 CPU 和内存开销。 -
适用性:适用于服务器主动发送静态文件的场景。
-
-
上传场景:
-
数据流向:上传时,客户端将文件数据通过 HTTP 请求发送到服务器,服务器需要接收数据并处理(如保存到磁盘或云存储)。
-
技术限制:上传的数据来自网络流,必须经过内核缓冲区 → 用户空间 → 磁盘/云存储的路径,无法绕过用户空间直接写入目标位置。
-
结论:上传无法直接使用零拷贝技术,但可通过流式处理(
Flux<DataBuffer>
)减少内存占用。
-
-
若文件存储在非本地磁盘(如云存储),需直接返回 URL,无法使用零拷贝。
6.2.2 动态生成文件并下载
@GetMapping("/generate-csv")
public Mono<Void> generateCsv(ServerHttpResponse response) {// 1. 设置响应头response.getHeaders().setContentType(MediaType.TEXT_PLAIN);response.getHeaders().setContentDisposition(ContentDisposition.attachment().filename("data.csv").build());// 2. 生成 CSV 数据流Flux<String> csvLines = Flux.just("Name,Age", "Alice,30", "Bob,25");// 3. 将字符串转换为字节缓冲区并写入响应return response.writeWith(csvLines.map(line -> response.bufferFactory().wrap(line.getBytes(StandardCharsets.UTF_8))));
}
-
设置响应头,指定文件名和内容类型。
-
创建包含 CSV 行的
Flux
。 -
将每行字符串转换为
DataBuffer
并写入响应。
6.2.3 断点续传(Range 请求支持)
作用:
-
允许客户端在下载中断后,从断点继续下载,避免重新传输整个文件。
-
节省带宽和时间,提升大文件下载体验。
流程:
-
客户端发起请求:通过
Range
头指定需要下载的字节范围(如Range: bytes=0-999
)。 -
服务端响应:
-
检查文件是否存在,解析
Range
头。 -
返回状态码
206 Partial Content
和Content-Range
头(如Content-Range: bytes 0-999/2000
)。 -
仅传输指定范围的字节。
-
-
客户端处理:
-
将接收到的数据追加到本地临时文件。
-
如果下载中断,记录已下载的字节位置,下次请求时从该位置继续。
-
@GetMapping("/download-large/{filename:.+}")
public Mono<Void> downloadLargeFile(@PathVariable String filename, ServerHttpRequest request, ServerHttpResponse response) {Path filePath = Paths.get("uploads", filename);Resource resource = new UrlResource(filePath.toUri());if (!resource.exists() || !resource.isReadable()) {return Mono.error(new FileNotFoundException("File not found: " + filename));}// 解析 Range 头List<HttpRange> ranges = request.getHeaders().getRange();long fileSize = resource.contentLength();if (ranges.isEmpty()) {// 完整下载response.getHeaders().setContentLength(fileSize);return response.writeWith(DataBufferUtils.read(resource, response.bufferFactory(), 4096));} else {// 处理分块请求HttpRange range = ranges.get(0);long start = range.getRangeStart(fileSize);long end = range.getRangeEnd(fileSize);long length = end - start + 1;response.getHeaders().setContentType(MediaType.APPLICATION_OCTET_STREAM);response.getHeaders().setContentLength(length);response.getHeaders().set(HttpHeaders.CONTENT_RANGE, "bytes " + start + "-" + end + "/" + fileSize);response.setStatusCode(HttpStatus.PARTIAL_CONTENT);return response.writeWith(DataBufferUtils.read(resource, response.bufferFactory(), 4096, start));}
}
6.3. 错误处理
6.3.1 全局异常处理
@ControllerAdvice
public class FileExceptionHandler {@ExceptionHandler(FileNotFoundException.class)public ResponseEntity<Mono<String>> handleFileNotFound(FileNotFoundException ex) {return ResponseEntity.status(HttpStatus.NOT_FOUND).body(Mono.just(ex.getMessage()));}@ExceptionHandler(DataBufferLimitException.class)public ResponseEntity<Mono<String>> handleSizeExceeded(DataBufferLimitException ex) {return ResponseEntity.status(HttpStatus.PAYLOAD_TOO_LARGE).body(Mono.just("File size exceeds limit"));}
}
6.3.2 上传文件大小限制错误
在 application.yml
中配置错误页面(可选):
spring:webflux:multipart:max-file-size: 10MBmax-request-size: 20MB
如果文件超过 max-file-size
,Spring 会抛出 DataBufferLimitException
。
如果请求总大小超过 max-request-size
,抛出 MaxUploadSizeExceededException
。
处理方式:
-
全局异常处理:
@ExceptionHandler(DataBufferLimitException.class) public ResponseEntity<String> handleFileSizeExceeded() {return ResponseEntity.status(HttpStatus.PAYLOAD_TOO_LARGE).body("File size exceeds limit"); }
-
前端提示:返回 HTTP 413 状态码,提示用户压缩文件或分块上传。
6.4. 安全与优化
6.4.1 文件名安全处理
private String sanitizeFilename(String filename) {// 移除路径信息String safeName = filename.replaceAll("^.*[\\\\/]", "");// 替换非法字符return safeName.replaceAll("[^a-zA-Z0-9.-]", "_");
}
6.4.2 病毒扫描(集成 ClamAV)
ClamAV 的作用
-
目的:检测上传文件是否包含病毒或恶意软件。
-
原理:ClamAV 是一个开源杀毒引擎,通过 TCP 连接调用其守护进程(clamd)进行扫描。
// 1. 添加 ClamAV 客户端依赖
<dependency><groupId>com.github.clam</groupId><artifactId>clam-client</artifactId><version>1.0.0</version>
</dependency>// 2. 扫描服务类
public class ClamAvService {private final ClamAVClient clamAVClient;public ClamAvService() {this.clamAVClient = new ClamAVClient("localhost", 3310);}public Mono<Boolean> scanFile(Path filePath) {return Mono.fromCallable(() -> {byte[] reply = clamAVClient.scan(filePath);return ClamAVClient.isCleanReply(reply);}).subscribeOn(Schedulers.boundedElastic()); // 在阻塞线程池执行}
}// 3. 在控制器中使用
@PostMapping("/safe-upload")
public Mono<String> safeUpload(@RequestPart("file") FilePart filePart) {Path path = Paths.get("uploads", sanitizeFilename(filePart.filename()));return DataBufferUtils.write(filePart.content(), path).then(clamAvService.scanFile(path)).flatMap(isClean -> {if (!isClean) {Files.delete(path); // 删除染毒文件return Mono.error(new VirusFoundException());}return Mono.just("File is safe");});
}
6.4.3 异步存储到云服务(如 COS)
@Service
public class CosService {@Autowiredprivate COSClient cosClient;@Value("${tencent.cos.bucket}")private String bucketName;public Mono<String> uploadToCos(FilePart filePart) {String fileName = sanitizeFilename(filePart.filename());String cosKey = "uploads/" + fileName;// 将文件内容转换为字节流Flux<DataBuffer> contentFlux = filePart.content();Flux<byte[]> bytesFlux = contentFlux.map(dataBuffer -> {byte[] bytes = new byte[dataBuffer.readableByteCount()];dataBuffer.read(bytes);DataBufferUtils.release(dataBuffer);return bytes;});// 异步上传return Mono.create(sink -> {TransferManager transferManager = new TransferManager(cosClient);Upload upload = transferManager.upload(bucketName, cosKey, Channels.newChannel(new FluxInputStream(bytesFlux)), // 自定义输入流new ObjectMetadata());upload.addProgressListener((ProgressListener) progressEvent -> {System.out.println("Upload progress: " + progressEvent.getBytesTransferred());});try {upload.waitForUploadResult();sink.success("https://" + bucketName + ".cos." + region + ".myqcloud.com/" + cosKey);} catch (InterruptedException e) {sink.error(e);}}).subscribeOn(Schedulers.boundedElastic());}
}// 自定义 Flux 转 InputStream
class FluxInputStream extends InputStream {private final Iterator<byte[]> iterator;private ByteArrayInputStream currentStream;public FluxInputStream(Flux<byte[]> flux) {this.iterator = flux.toIterable().iterator();}@Overridepublic int read() throws IOException {if (currentStream == null || currentStream.available() == 0) {if (!iterator.hasNext()) return -1;currentStream = new ByteArrayInputStream(iterator.next());}return currentStream.read();}
}
-
将上传文件的
Flux<DataBuffer>
转换为Flux<byte[]>
。 -
通过
TransferManager
异步上传到 COS。 -
添加进度监听器。
-
返回上传后的文件 URL。
@PostMapping("/upload-to-cos")
public Mono<String> uploadToCos(@RequestPart("file") FilePart filePart) {return cosService.uploadToCos(filePart).onErrorResume(e -> Mono.just("Upload failed: " + e.getMessage()));
}
6.5. 测试文件上传与下载
6.5.1 上传测试
@SpringBootTest
@AutoConfigureWebTestClient
class FileControllerTest {@Autowiredprivate WebTestClient webTestClient;@Testvoid testFileUpload() {Resource file = new FileSystemResource("test.txt");webTestClient.post().uri("/upload").contentType(MediaType.MULTIPART_FORM_DATA).bodyValue(generateMultipartBody(file)).exchange().expectStatus().isOk().expectBody(String.class).isEqualTo("File uploaded: test.txt");}private MultiValueMap<String, HttpEntity<?>> generateMultipartBody(Resource file) {MultipartBodyBuilder builder = new MultipartBodyBuilder();builder.part("file", file).filename("test.txt");return builder.build();}
}
6.5.2 下载测试
@Test
void testFileDownload() throws IOException {Path testFile = Files.write(Paths.get("uploads/test.txt"), "Hello World".getBytes());webTestClient.get().uri("/download/test.txt").exchange().expectStatus().isOk().expectHeader().contentType(MediaType.APPLICATION_OCTET_STREAM).expectHeader().contentDisposition(ContentDisposition.attachment().filename("test.txt").build()).expectBody(String.class).isEqualTo("Hello World");Files.delete(testFile);
}
6.6. 性能优化
6.6.1 调整缓冲区大小
在 application.yml
中配置:
spring:webflux:max-in-memory-size: 1MB # 内存缓冲区大小(超过部分写入磁盘)
-
内存缓冲区大小:控制文件上传或请求体解析时,内存中缓存的最大数据量。
-
设计目的:防止大文件上传占用过多内存,导致应用 OOM(内存溢出)。
超限处理流程
-
小文件上传(<1MB):
-
整个文件内容缓存在内存中,快速处理。
-
-
大文件上传(>1MB):
-
内存部分:前 1MB 数据保留在内存缓冲区。
-
超出部分:自动写入磁盘临时文件(路径由
java.io.tmpdir
指定)。 -
最终处理:所有数据块(内存 + 磁盘临时文件)合并后传递给业务代码。
-
代码示例与验证
@PostMapping("/upload")
public Mono<String> upload(@RequestPart("file") FilePart filePart) {// 即使上传 100MB 文件,内存占用不会超过 1MBreturn DataBufferUtils.write(filePart.content(), Paths.get("uploads/file.dat")).then(Mono.just("Upload success"));
}
注意事项
-
临时文件清理:Spring 不会自动删除临时文件,需配置定时任务或钩子清理。
-
性能影响:频繁的磁盘 I/O 可能降低性能,建议根据服务器内存调整此值(如设置为 10MB)。
6.6.2 分块传输编码
对于大文件下载,自动启用分块传输:
response.getHeaders().set(HttpHeaders.TRANSFER_ENCODING, "chunked");
6.7. 完整配置示例
6.7.1 文件存储目录自动创建
@PostConstruct
public void init() throws IOException {Path uploadDir = Paths.get("uploads");if (!Files.exists(uploadDir)) {Files.createDirectories(uploadDir);}
}
6.7.2 清理临时文件
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.nio.file.*;@Configuration
public class TempFileCleanup {private Path uploadDir = Paths.get("uploads");// 启动时创建目录@PostConstructpublic void init() throws IOException {if (!Files.exists(uploadDir)) {Files.createDirectories(uploadDir);}}// 关闭时清理文件@PreDestroypublic void cleanup() throws IOException {Files.walk(uploadDir).filter(Files::isRegularFile).forEach(path -> {try {Files.delete(path);System.out.println("Deleted: " + path);} catch (IOException e) {System.err.println("Failed to delete: " + path);}});}
}
-
生产环境:建议使用定时任务清理过期文件(如 Quartz 或 Spring Scheduler)。
6.8 总结
功能 | 实现方案 |
---|---|
文件上传 | FilePart + DataBufferUtils.write |
文件下载 | Resource + 零拷贝写入 |
大文件处理 | 分块上传(PartEvent )、断点续传(Range 头) |
安全性 | 文件名清洗、病毒扫描 |
云存储集成 | 腾讯云(COS) |
性能优化 | 零拷贝、分块传输、堆外内存配置 |
测试 | WebTestClient 模拟多部分请求 |
7. WebSocket 集成
7.1. 项目初始化与依赖配置
创建 Spring Boot 项目
使用 Spring Initializr 生成项目,选择以下依赖:
-
Spring Reactive Web(包含 WebFlux)
-
Lombok(简化代码,可选)
添加必要依赖
确保 pom.xml
包含 WebFlux 依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
7.2. 配置 WebSocket 路由
定义路由映射
创建配置类 WebSocketConfig.java
,将 WebSocket 路径映射到自定义的 WebSocketHandler
:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
import java.util.HashMap;
import java.util.Map;@Configuration
public class WebSocketConfig {@Beanpublic HandlerMapping handlerMapping(WebSocketHandler webSocketHandler) {Map<String, WebSocketHandler> pathMap = new HashMap<>();pathMap.put("/ws/chat", webSocketHandler); // 绑定路径到处理器SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();mapping.setUrlMap(pathMap);mapping.setOrder(-1); // 设置最高优先级return mapping;}@Beanpublic WebSocketHandlerAdapter handlerAdapter() {return new WebSocketHandlerAdapter();}
}
-
关键点:
-
SimpleUrlHandlerMapping
将 URL 路径/ws/chat
映射到自定义的WebSocketHandler
。 -
WebSocketHandlerAdapter
是 Spring WebFlux 处理 WebSocket 的核心适配器。
-
7.3. 实现 WebSocketHandler
自定义处理器
创建 ChatWebSocketHandler.java
,实现 WebSocketHandler
接口,管理会话和处理消息:
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;@Component
public class ChatWebSocketHandler implements WebSocketHandler {// 使用线程安全的集合存储所有活跃会话private final List<WebSocketSession> sessions = new CopyOnWriteArrayList<>();@Overridepublic Mono<Void> handle(WebSocketSession session) {// 1. 新连接加入会话池sessions.add(session);System.out.println("[连接建立] 会话ID: " + session.getId());// 2. 处理客户端发送的消息流return session.receive() // 返回 Flux<WebSocketMessage>.flatMap(message -> processMessage(message, session)).doFinally(signal -> {// 3. 连接关闭时清理会话sessions.remove(session);System.out.println("[连接关闭] 会话ID: " + session.getId());}).then(); // 返回 Mono<Void> 表示处理完成}// 处理单条消息private Mono<Void> processMessage(WebSocketMessage message, WebSocketSession sender) {if (message.getType() == WebSocketMessage.Type.TEXT) {String payload = message.getPayloadAsText();System.out.println("[收到消息] 来自 " + sender.getId() + ": " + payload);return broadcast(payload, sender); // 广播消息}return Mono.empty(); // 忽略非文本消息}// 广播消息给所有其他用户private Mono<Void> broadcast(String message, WebSocketSession sender) {return Flux.fromIterable(sessions).filter(session -> !session.equals(sender)) // 排除发送者.flatMap(session -> session.send(Mono.just(session.textMessage(formatMessage(message, sender))))).then(); // 合并所有发送操作为一个 Mono<Void>}// 格式化消息(可自定义协议)private String formatMessage(String message, WebSocketSession sender) {return String.format("[用户 %s]: %s", sender.getId(), message);}
}
代码逻辑详解
-
会话管理:
-
sessions
使用CopyOnWriteArrayList
,保证线程安全(读多写少场景)。 -
新连接建立时,
sessions.add(session)
将会话加入列表。 -
连接关闭时,
doFinally
回调中移除会话,防止内存泄漏。
-
-
消息处理:
-
session.receive()
返回Flux<WebSocketMessage>
,表示客户端发送的消息流。 -
flatMap
对每条消息进行处理,支持非阻塞操作。 -
processMessage
处理文本消息,忽略二进制等其他类型消息。
-
-
广播机制:
-
Flux.fromIterable(sessions)
遍历所有会话。 -
filter
排除发送者,避免消息回传。 -
session.send(...)
发送消息,Mono.just(...)
创建文本消息。
-
-
响应式流控制:
-
flatMap
将每条消息转换为广播操作流。 -
then()
将多个异步操作合并为Mono<Void>
,表示整体完成。
-
7.4. 高级扩展功能
握手阶段鉴权
在握手阶段验证 Token 或用户身份:
import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;public class CustomHandshakeHandler extends HandshakeWebSocketService {public CustomHandshakeHandler(RequestUpgradeStrategy upgradeStrategy) {super(upgradeStrategy);}@Overridepublic Mono<Void> handleRequest(ServerWebExchange exchange, WebSocketHandler handler) {// 从请求头获取 TokenHttpHeaders headers = exchange.getRequest().getHeaders();String token = headers.getFirst("Authorization");if (!isValidToken(token)) {return Mono.error(new SecurityException("无效的 Token"));}// 继续处理握手return super.handleRequest(exchange, handler);}private boolean isValidToken(String token) {// 实现 Token 验证逻辑return true;}
}
在配置类中替换默认的 WebSocketService
:
@Bean
public WebSocketService webSocketService() {return new CustomHandshakeHandler(new ReactorNettyRequestUpgradeStrategy());
}
心跳检测
配置底层 Netty 心跳机制:
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
import org.springframework.boot.web.server.WebServerFactoryCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.netty.http.server.HttpServer;@Configuration
public class NettyConfig {@Beanpublic WebServerFactoryCustomizer<NettyReactiveWebServerFactory> webServerFactoryCustomizer() {return factory -> factory.addServerCustomizers(httpServer -> httpServer.tcpConfiguration(tcpServer ->tcpServer.option(ChannelOption.SO_KEEPALIVE, true)));}
}
消息协议设计
使用 JSON 格式传递结构化数据:
import com.fasterxml.jackson.databind.ObjectMapper;// 消息实体类
@Data // Lombok 注解
@AllArgsConstructor
@NoArgsConstructor
public class ChatMessage {private String sender;private String content;private long timestamp;
}// 修改广播方法
private Mono<Void> broadcast(String rawMessage, WebSocketSession sender) {ObjectMapper mapper = new ObjectMapper();try {ChatMessage message = new ChatMessage(sender.getId(),rawMessage,System.currentTimeMillis());String json = mapper.writeValueAsString(message);return Flux.fromIterable(sessions).filter(session -> !session.equals(sender)).flatMap(session -> session.send(Mono.just(session.textMessage(json)))).then();} catch (Exception e) {return Mono.error(e);}
}