Spring WebFlux

1. 响应式编程基础


1.1. 什么是响应式编程?

响应式编程是一种编程范式,专注于数据流和变化传播。它的核心思想是:

  • 数据流:将数据看作流动的序列(Stream),可以是有限的(如列表)或无限的(如实时事件流)。

  • 异步非阻塞:通过异步方式处理数据流,避免阻塞线程,提高资源利用率。

  • 变化传播:当数据流中的某个数据发生变化时,自动通知依赖它的组件进行更新。

响应式编程的目标是构建高效、弹性、可扩展的系统,特别适合处理高并发、实时数据处理的场景。


1.2. 阻塞式与非阻塞式的区别

阻塞式(Blocking)

  • 特点

    • 线程在执行任务时会一直等待,直到任务完成。

    • 例如,传统的 Servlet 模型在处理请求时,线程会一直阻塞,直到 I/O 操作(如数据库查询、网络请求)完成。

  • 缺点

    • 线程资源浪费,无法高效处理高并发请求。

    • 每个请求都需要一个独立的线程,线程数量受限于操作系统。

非阻塞式(Non-blocking)

  • 特点

    • 线程不会等待任务完成,而是继续执行其他任务。

    • 例如,在响应式编程中,I/O 操作会立即返回一个 Mono 或 Flux,线程可以继续处理其他任务,等到 I/O 操作完成后再通知线程处理结果。

  • 优点

    • 线程资源利用率高,适合高并发场景。

    • 少量线程即可处理大量请求。


1.3. Reactive Streams 标准

Reactive Streams 是一个规范,定义了响应式编程的核心接口和规则,目的是实现异步数据流的标准化。它的核心组件包括:

  • Publisher(发布者)

    • 数据流的源头,负责生成数据。

    • 例如,Flux 和 Mono 都是 Publisher 的实现。

  • Subscriber(订阅者)

    • 数据流的消费者,负责处理数据。

    • 通过 onNextonErroronComplete 方法接收数据流的事件。

  • Subscription(订阅)

    • 连接 Publisher 和 Subscriber 的桥梁。

    • 通过 request(n) 方法控制数据流的请求量(背压机制)。

  • Backpressure(背压)

    • 一种流量控制机制,用于解决生产者和消费者速度不匹配的问题。

    • 消费者通过 Subscription 告诉生产者需要多少数据,避免数据积压。


1.4. Reactive Streams 与 Spring WebFlux 的关系

Spring WebFlux 是 Spring 5 引入的响应式 Web 框架,它的核心设计基于 Reactive Streams 标准。以下是它们之间的关系:

Spring WebFlux 的响应式模型

  • 基于 Reactive Streams

    • Spring WebFlux 使用 Reactor 库(实现了 Reactive Streams 标准)作为其响应式编程的核心。

    • Reactor 提供了 Mono 和 Flux 两种数据类型,分别表示单值流和多值流。

  • 非阻塞 I/O

    • Spring WebFlux 使用非阻塞 I/O 模型,能够高效处理高并发请求。

    • 例如,WebFlux 的 WebClient 是一个非阻塞的 HTTP 客户端,基于 Reactive Streams 实现。

WebFlux 的核心组件

  • Handler 和 Router

    • WebFlux 提供了两种编程模型:基于注解的控制器和函数式端点。

    • 无论是哪种模型,底层都依赖于 Reactive Streams 的 Publisher 和 Subscriber

  • WebClient

    • WebFlux 的 WebClient 是一个非阻塞的 HTTP 客户端,基于 Reactive Streams 实现。

    • 它返回 Mono 或 Flux,支持背压机制。

背压机制的应用

  • WebFlux 的背压支持

    • WebFlux 在处理数据流时,会自动应用背压机制,避免生产者过快导致消费者无法处理。

    • 例如,当客户端请求大量数据时,WebFlux 会根据消费者的处理能力动态调整数据流的发送速度。


1.5. 响应式编程的应用场景

响应式编程特别适合以下场景:

  • 高并发请求

    • 例如,Web 服务器需要同时处理成千上万的请求。

  • 实时数据处理

    • 例如,实时监控系统、股票行情推送、聊天应用。

  • 异步任务处理

    • 例如,批量文件处理、异步消息队列。

  • 流式数据处理

    • 例如,处理大文件或无限数据流(如日志流)。


1.6. 响应式编程与传统编程模型的对比

  • 传统编程模型(阻塞式)

    • 同步阻塞:线程会一直等待任务完成。

    • 回调地狱:嵌套的回调函数导致代码难以维护。

    • 资源浪费:线程在等待时无法处理其他任务。

  • 响应式编程模型(非阻塞式)

    • 异步非阻塞:线程不会等待任务完成,而是继续处理其他任务。

    • 声明式编程:通过操作符(如 mapflatMap)描述数据流的处理逻辑,代码更简洁。

    • 资源高效:线程资源利用率高,适合高并发场景。

示例对比:

  • 传统编程(阻塞式)

    String result = blockingHttpClient.get("https://example.com"); // 阻塞线程
    System.out.println(result);
  • 响应式编程(非阻塞式)

    Mono<String> result = webClient.get().uri("https://example.com").retrieve().bodyToMono(String.class); // 非阻塞
    result.subscribe(System.out::println); // 异步处理结果

1.7. Reactor 的调试工具

Reactor 提供了丰富的调试工具,帮助开发者理解和调试响应式流:

  • log() 操作符:在数据流的每个阶段打印日志,方便观察数据流的执行过程。

Flux.just("apple", "banana", "cherry").log() // 打印日志.map(String::toUpperCase).subscribe();

输出:

INFO  - onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
INFO  - request(unbounded)
INFO  - onNext(apple)
INFO  - onNext(banana)
INFO  - onNext(cherry)
INFO  - onComplete()
  • Hooks.onOperatorDebug():启用调试模式,打印更详细的堆栈信息。

Hooks.onOperatorDebug();
Flux.just(1, 2, 3).map(n -> n / 0) // 模拟错误.subscribe();
  • StepVerifier:用于测试响应式流的行为。

StepVerifier.create(Flux.just(1, 2, 3)).expectNext(1, 2, 3).verifyComplete();

2. WebFlux 核心功能

2.1 WebFlux 注解模型

Spring WebFlux 提供了基于注解的编程模型,类似于 Spring MVC,但支持响应式编程。以下是 WebFlux 注解模型的详细讲解:


2.1.1. 注解驱动开发

WebFlux 的注解模型与 Spring MVC 类似,但支持响应式类型(如 Mono 和 Flux)。以下是核心注解及其用法:


核心注解

  • @RestController

    • 用于标记一个类为控制器,处理 HTTP 请求。

    • 返回的数据会自动序列化为 JSON 或 XML。

@RestController
public class MyController {@GetMapping("/hello")public Mono<String> hello() {return Mono.just("Hello, WebFlux!");}
}
  • @RequestMapping

    • 用于映射 HTTP 请求到控制器方法。

    • 可以指定路径、请求方法、请求头等。

@RestController
@RequestMapping("/api")
public class MyController {@RequestMapping(value = "/hello", method = RequestMethod.GET)public Mono<String> hello() {return Mono.just("Hello, WebFlux!");}
}

HTTP 方法注解

  • @GetMapping

    • 处理 HTTP GET 请求。

@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable String id) {return userService.findUserById(id);
}
  • @PostMapping

    • 处理 HTTP POST 请求。

@PostMapping("/users")
public Mono<User> createUser(@RequestBody User user) {return userService.saveUser(user);
}
  • @PutMapping@DeleteMapping@PatchMapping

    • 分别用于处理 HTTP PUT、DELETE 和 PATCH 请求。


参数绑定注解

  • @PathVariable

    • 用于从 URL 路径中提取变量。

@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable String id) {return userService.findUserById(id);
}
  • @RequestParam

    • 用于从查询参数中提取值。

@GetMapping("/users")
public Flux<User> getUsers(@RequestParam String name) {return userService.findUsersByName(name);
}
  • @RequestBody

    • 用于将请求体绑定到方法参数。

@PostMapping("/users")
public Mono<User> createUser(@RequestBody User user) {return userService.saveUser(user);
}
  • @ModelAttribute

    • 用于将表单数据绑定到对象。

@PostMapping("/users")
public Mono<User> createUser(@ModelAttribute User user) {return userService.saveUser(user);
}
  • @SessionAttribute

    • 用于从会话中提取属性。

@GetMapping("/profile")
public Mono<User> getProfile(@SessionAttribute User user) {return Mono.just(user);
}

请求头和 Cookie 注解

  • @RequestHeader

    • 用于从请求头中提取值。

@GetMapping("/hello")
public Mono<String> hello(@RequestHeader("User-Agent") String userAgent) {return Mono.just("User-Agent: " + userAgent);
}
  • @CookieValue

    • 用于从 Cookie 中提取值。

@GetMapping("/hello")
public Mono<String> hello(@CookieValue("JSESSIONID") String sessionId) {return Mono.just("Session ID: " + sessionId);
}

使用 ServerWebExchange 获取完整的请求上下文

  • ServerWebExchange

    • 提供了对 HTTP 请求和响应的完全访问。

    • 可以获取请求头、Cookie、会话等信息。

@GetMapping("/info")
public Mono<String> getInfo(ServerWebExchange exchange) {String path = exchange.getRequest().getPath().value();return Mono.just("Request Path: " + path);
}

2.1.2. 控制器方法的返回类型

Mono<T> 和 Flux<T>

  • Mono<T>

    • 表示一个异步的单值流。

    • 适用于返回单个结果的场景。

@GetMapping("/user/{id}")
public Mono<User> getUser(@PathVariable String id) {return userService.findUserById(id);
}
  • Flux<T>

    • 表示一个异步的多值流。

    • 适用于返回多个结果的场景。

@GetMapping("/users")
public Flux<User> getUsers() {return userService.findAllUsers();
}

与传统阻塞式返回类型的对比

  • 传统阻塞式返回类型(如 String

    • 线程会阻塞,直到任务完成。

@GetMapping("/hello")
public String hello() {return "Hello, World!";
}
  • 响应式返回类型(如 Mono<String>

    • 线程不会阻塞,可以继续处理其他任务。

@GetMapping("/hello")
public Mono<String> hello() {return Mono.just("Hello, WebFlux!");
}

2.1.3. 异常处理

自定义全局异常处理:@ControllerAdvice

  • @ControllerAdvice

    • 用于定义全局异常处理逻辑。

@ControllerAdvice
public class GlobalExceptionHandler {@ExceptionHandler(UserNotFoundException.class)public Mono<ResponseEntity<String>> handleUserNotFoundException(UserNotFoundException ex) {return Mono.just(ResponseEntity.status(HttpStatus.NOT_FOUND).body(ex.getMessage()));}
}

异步异常捕获

  • 在响应式编程中,异常可以通过 onErrorResumeonErrorReturn 等方法捕获和处理。

@GetMapping("/user/{id}")
public Mono<User> getUser(@PathVariable String id) {return userService.findUserById(id).onErrorResume(ex -> Mono.just(new User("default", "Default User")));
}

2.2 WebFlux 函数式模型

ServerRequest

  • 表示 HTTP 请求,提供了访问请求头、参数、正文等方法。

HandlerFunction<ServerResponse> helloHandler = request -> {String name = request.queryParam("name").orElse("World");return ServerResponse.ok().bodyValue("Hello, " + name + "!");
};

ServerResponse

  • 表示 HTTP 响应,提供了构建响应状态码、头信息、正文等方法。

HandlerFunction<ServerResponse> helloHandler = request -> ServerResponse.ok().bodyValue("Hello, WebFlux!");

2.1.1 HandlerFunction

定义

  • HandlerFunction 是一个函数式接口,用于处理 HTTP 请求并生成响应。

  • 它的核心方法是 Mono<ServerResponse> handle(ServerRequest request)

作用

  • 实现具体的业务逻辑。

  • 接收 ServerRequest,返回 ServerResponse

示例

HandlerFunction<ServerResponse> helloHandler = request -> ServerResponse.ok().bodyValue("Hello, WebFlux!");

这里的意思是创建了个HandlerFunction类型的变量 helloHandler,这个helloHandler是怎么生成的,是通过接受一个request作为参数,并且通过ServerResponse返回Mono<ServerResponse>。

2.2.1 RouterFunction

定义

  • RouterFunction 是一个函数式接口,用于定义路由规则。

  • 它的核心方法是 Mono<HandlerFunction> route(ServerRequest request)

作用

  • 将 HTTP 请求映射到对应的 HandlerFunction

  • 根据请求的路径、方法等条件,选择合适的 HandlerFunction 来处理请求。

RouterFunctions.route()

  • 定义

    • RouterFunctions.route 是一个静态方法,用于创建 RouterFunction

  • 参数

    • 第一个参数是请求谓词(如 GETPOST)。

    • 第二个参数是 HandlerFunction

RouterFunction<ServerResponse> route = RouterFunctions.route(GET("/hello"), request -> ServerResponse.ok().bodyValue("Hello, WebFlux!")
);

 这里的意思也是创建了个RouterFunction类型的变量route,调用了它自身的静态方法RouterFunctions.route(将/hello请求路由到这里),接受一个request作为参数,并且返回一个Mono<HandlerFunction>,这里第二行的 request -> ServerResponse.ok().bodyValue("Hello, WebFlux!")一般是用一个HandlerFunction类的对象来替代。


2.2.3. HandlerFunction 和 RouterFunction 的关系

核心关系

  • RouterFunction 是路由的入口

    • RouterFunction 负责根据请求的路径、方法等条件,选择合适的 HandlerFunction 来处理请求。

    • 它类似于一个路由器,将请求分发到对应的处理器。

  • HandlerFunction 是业务逻辑的实现

    • HandlerFunction 负责处理具体的业务逻辑,并生成响应。

    • 它类似于一个控制器方法,专注于处理请求并返回结果。

工作流程

  • 请求到达

    • 当一个 HTTP 请求到达时,WebFlux 会调用 RouterFunction 的 route 方法。

  • 路由匹配

    • RouterFunction 根据请求的路径、方法等条件,匹配对应的 HandlerFunction

  • 处理请求

    • 如果匹配成功,RouterFunction 返回对应的 HandlerFunction

    • WebFlux 调用 HandlerFunction 的 handle 方法处理请求。

  • 生成响应

    • HandlerFunction 处理请求并生成 ServerResponse

    • WebFlux 将 ServerResponse 转换为 HTTP 响应并返回给客户端。

示例代码

以下是一个完整的示例,展示了 RouterFunction 和 HandlerFunction 的协同工作:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.*;@Configuration
public class WebConfig {// 定义 HandlerFunctionHandlerFunction<ServerResponse> helloHandler = request -> ServerResponse.ok().bodyValue("Hello, WebFlux!");HandlerFunction<ServerResponse> goodbyeHandler = request -> ServerResponse.ok().bodyValue("Goodbye, WebFlux!");// 定义 RouterFunction@Beanpublic RouterFunction<ServerResponse> router() {return RouterFunctions.route(GET("/hello"), helloHandler).andRoute(GET("/goodbye"), goodbyeHandler);}
}
  • RouterFunction

    • 定义了两条路由规则:

      • /hello 请求由 helloHandler 处理。

      • /goodbye 请求由 goodbyeHandler 处理。

  • HandlerFunction

    • helloHandler 返回 "Hello, WebFlux!"。

    • goodbyeHandler 返回 "Goodbye, WebFlux!"。


2.3.4 RouterFunctions.nest() 实现路由嵌套

路由嵌套的作用

  • 将一组相关的路由规则嵌套在一个公共路径下,提高代码的可读性和可维护性。

  • 例如,将所有 /api 开头的请求嵌套在一起。

示例代码

@Bean
public RouterFunction<ServerResponse> nestedRouter() {return RouterFunctions.nest(path("/api"), // 公共路径RouterFunctions.route(GET("/users"), request -> ServerResponse.ok().bodyValue("User List")).andRoute(GET("/products"), request -> ServerResponse.ok().bodyValue("Product List")));
}
  • RouterFunctions.nest

    • 将 /users 和 /products 路由嵌套在 /api 路径下。

    • 最终的路由路径为:

      • /api/users

      • /api/products


2.3.5 HandlerFilterFunction 实现请求拦截和过滤

HandlerFilterFunction 的作用

  • 在请求到达 HandlerFunction 之前或之后执行逻辑。

  • 常见的应用场景包括:

    • 身份验证

    • 日志记录

    • 异常处理

示例代码

// 定义一个日志过滤器
HandlerFilterFunction<ServerResponse, ServerResponse> logFilter = (request, next) -> {System.out.println("Request Path: " + request.path());return next.handle(request);
};// 应用过滤器
@Bean
public RouterFunction<ServerResponse> filteredRouter() {return RouterFunctions.route(GET("/hello"), request -> ServerResponse.ok().bodyValue("Hello, WebFlux!")).filter(logFilter); // 添加过滤器
}
  • logFilter

    • 在请求到达 HandlerFunction 之前,打印请求路径。

    • 调用 next.handle(request) 继续处理请求。


3. WebFlux 的数据处理

单值流(Mono)和多值流(Flux)

  • Mono<T>
    表示异步序列中最多只包含一个元素(或一个错误)的流。
    适合处理如单个对象查询、单个文件上传结果等场景。

    Mono<String> mono = Mono.just("Hello, Mono!");
    mono.subscribe(System.out::println);
    
  • Flux<T>
    表示异步序列中零个或多个元素(或一个错误)的流。
    适合处理如列表查询、批量处理、流式数据处理等场景。

    Flux<Integer> flux = Flux.just(1, 2, 3, 4);
    flux.subscribe(System.out::println);
    

总结表格:最常用操作

操作Mono 示例Flux 示例
创建Mono.just("Hello")Flux.just("A", "B")
转换mono.map(s -> s.length())flux.map(s -> s.toUpperCase())
异步转换mono.flatMap(id -> findUser(id))flux.flatMap(s -> queryAsync(s))
过滤-flux.filter(s -> s.startsWith("A"))
错误处理mono.onErrorReturn("default")flux.onErrorResume(e -> backupFlux)
组合流Mono.zip(mono1, mono2)flux1.mergeWith(flux2)

3.1 Mono 操作

3.1.1. 创建 Mono

(1) Mono.just(T data)

  • 作用:包装一个已知的值。

  • 场景:当数据已经存在时。

Mono<String> mono = Mono.just("Hello"); // 直接包含"Hello"

(2) Mono.empty()

  • 作用:创建一个不发射任何数据的 Mono

  • 场景:表示一个空操作(如删除数据后无需返回内容)。

Mono<Void> deleteOperation = Mono.empty(); // 常用于删除操作

(3) Mono.error(Throwable error)

  • 作用:创建一个立即抛出错误的 Mono

  • 场景:快速失败(如参数校验失败)。

Mono<String> errorMono = Mono.error(new RuntimeException("Bad Request"));

3.1.2. 转换 Mono

(1) map(Function<T, R> mapper)

  • 作用同步转换数据。

  • 本质:直接对数据做计算,不涉及异步。

Mono<Integer> lengthMono = Mono.just("Hello").map(s -> s.length()); // 5

(2) flatMap(Function<T, Mono<R>> mapper)

  • 作用异步转换数据,返回新的 Mono

  • 本质:用于链式调用另一个异步操作(如数据库查询)。

Mono<User> userMono = userIdMono.flatMap(id -> userRepository.findById(id));

(3) then(Mono<V> other)

  • 作用:忽略当前结果,执行下一个 Mono

  • 场景:顺序执行不依赖前一个结果的操作。

Mono<Void> logOperation = Mono.just("Processed").then(Mono.fromRunnable(() -> System.out.println("Done")));

3.1.3. 错误处理

(1) onErrorReturn(T fallbackValue)

  • 作用:遇到错误时返回默认值。

  • 场景:快速降级。

Mono<Integer> safeNumber = Mono.just("abc").map(Integer::parseInt).onErrorReturn(0); // 解析失败返回0

(2) onErrorResume(Function<Throwable, Mono<T>> fallback)

  • 作用:捕获错误并返回另一个 Mono

  • 场景:复杂降级逻辑(如调用备用服务)。

Mono<User> user = userRepository.findById(id).onErrorResume(e -> backupUserRepository.findById(id));

3.2 Flux 操作

3.2.1. 创建 Flux

(1) Flux.just(T... data)

  • 作用:创建包含多个元素的流。

Flux<String> flux = Flux.just("A", "B", "C"); // 发射三个字符串

(2) Flux.fromIterable(Iterable<T> iterable)

  • 作用:从集合(如 List)创建流。

  • 场景:将已有数据转换为流。

List<String> list = Arrays.asList("A", "B");
Flux<String> flux = Flux.fromIterable(list); // 发射"A", "B"

(3) Flux.range(int start, int count)

  • 作用:生成整数序列。

Flux<Integer> numbers = Flux.range(1, 3); // 发射1, 2, 3

3.2.2. 转换 Flux

(1) map(Function<T, R> mapper)

  • 作用同步转换每个元素。

Flux<Integer> lengths = Flux.just("apple", "banana").map(String::length); // 5, 6

(2) flatMap(Function<T, Publisher<R>> mapper)

  • 作用异步转换每个元素为新的流,并合并所有流。

  • 场景:并行处理元素(如为每个ID查询数据库)。

Flux<String> results = Flux.just(1, 2, 3).flatMap(id -> userRepository.findById(id)); // 异步查询每个用户

3.2.3. 过滤和选择

(1) filter(Predicate<T> predicate)

  • 作用:过滤不符合条件的元素。

Flux<Integer> evenNumbers = Flux.range(1, 5).filter(n -> n % 2 == 0); // 2, 4

(2) take(long n)

  • 作用:取前 n 个元素。

  • 场景:分页查询或限制数据量。

Flux<Integer> firstTwo = Flux.range(1, 10).take(2); // 1, 2

3.2.4. 组合 Flux

(1) mergeWith(Publisher<T> other)

  • 作用:合并两个流,元素按实际到达顺序交错发射。

  • 场景:合并实时数据流(如多个传感器数据)。

Flux<String> flux1 = Flux.just("A", "B");
Flux<String> flux2 = Flux.just("C", "D");
Flux<String> merged = flux1.mergeWith(flux2); // 可能输出 A, C, B, D

(2) zipWith(Publisher<T2> other, BiFunction<T, T2, R> combinator)

  • 作用:将两个流的元素按位置一一合并。

  • 场景:组合关联数据(如用户名和年龄)。

Flux<String> names = Flux.just("John", "Jane");
Flux<Integer> ages = Flux.just(30, 25);
Flux<String> zipped = names.zipWith(ages, (name, age) -> name + ":" + age);
// 输出 John:30, Jane:25

3.2.5. 错误处理

(1) onErrorReturn(T fallbackValue)

  • 作用:遇到错误时返回默认值并终止流。

Flux<Integer> safeNumbers = Flux.just("1", "abc", "3").map(Integer::parseInt).onErrorReturn(-1); // 输出1, -1

(2) onErrorResume(Function<Throwable, Publisher<T>> fallback)

  • 作用:捕获错误并切换到备用流。

Flux<Integer> numbers = Flux.just(1, 2, 0).map(n -> 10 / n).onErrorResume(e -> Flux.just(-1, -2)); // 输出1, 5, -1, -2

3.3 核心理解要点

3.3.1 基础知识

1. Mono vs Flux

  • Mono:代表 0或1个元素 的流(如查询单个用户)。

  • Flux:代表 0到N个元素 的流(如查询用户列表)。

2. 同步 vs 异步

  • map:同步操作(立即执行,不切换线程)。

  • flatMap:异步操作(可能切换线程,返回新的流)。

3. 链式调用

所有操作符可以像流水线一样链式调用:

Flux.just("apple", "banana").filter(s -> s.length() > 5).map(String::toUpperCase).subscribe(System.out::println); // 输出 "BANANA"

3.3.2 订阅 subscribe() 

1. 什么是 subscribe()

  • 定义:subscribe() 是 订阅数据流 的方法,调用后数据流开始执行。

  • 核心作用:触发数据流的执行,并定义如何处理数据、错误和完成信号。

2. 为什么需要 subscribe()

  • 在响应式编程中,数据流是 惰性 的,只有调用 subscribe() 时,数据流才会开始执行。

  • 如果没有调用 subscribe(),数据流只是一段定义好的逻辑,不会实际运行。

3.subscribe() 的常用重载方法

Mono 和 Flux 的 subscribe() 方法有多个重载版本,以下是常用的两种:

(1) 无参数版本

  • 作用:只订阅数据流,不处理任何数据或事件。

Flux.just("A", "B", "C").subscribe();
  • 解释:数据流会执行,但不会对数据做任何处理。

(2) 带 Consumer<T> 参数

  • 作用:订阅并处理每个数据项。

Flux.just("A", "B", "C").subscribe(data -> System.out.println("Received: " + data));

(3)带 Consumer<T> 和 Consumer<Throwable> 参数

  • 作用:订阅并处理数据项和错误。

Flux.just("A", "B", "C").concatWith(Flux.error(new RuntimeException("Error"))).subscribe(data -> System.out.println("Received: " + data),error -> System.err.println("Error: " + error.getMessage()));

(4)带 Consumer<T>Consumer<Throwable> 和 Runnable 参数

  • 作用:订阅并处理数据项、错误和完成信号。

Flux.just("A", "B", "C").subscribe(data -> System.out.println("Received: " + data),error -> System.err.println("Error: " + error.getMessage()),() -> System.out.println("Completed!"));

(5)使用 BaseSubscriber 自定义订阅者

  • 作用:完全控制订阅逻辑,包括 request 的数量,自定义背压逻辑。

Flux.range(1, 1000).subscribe(new BaseSubscriber<Integer>() {@Overrideprotected void hookOnSubscribe(Subscription subscription) {// 初始请求10个数据request(10);}@Overrideprotected void hookOnNext(Integer value) {System.out.println("Received: " + value);// 每处理完一个数据,再请求1个request(1);}@Overrideprotected void hookOnComplete() {System.out.println("Completed!");}@Overrideprotected void hookOnError(Throwable throwable) {System.err.println("Error: " + throwable.getMessage());}});
  • 解释

    • hookOnSubscribe() 中调用 request(10),表示初始请求 10 个数据。

    • hookOnNext() 中调用 request(1),表示每处理完一个数据,再请求 1 个。

实际应用场景:分页查询

  • 场景:从数据库分页查询数据,每次请求固定数量的数据。

Flux<User> users = userRepository.findAll().limitRate(100); // 每次查询100条数据users.subscribe(user -> System.out.println("User: " + user));

3.3.3 背压(Backpressure)

1. 什么是背压?

  • 定义:背压是 数据流中生产者(Publisher)和消费者(Subscriber)之间的流量控制机制

  • 问题背景

    • 如果生产者生产数据的速度 > 消费者处理数据的速度,会导致消费者积压大量未处理的数据,最终可能内存溢出(OOM)。

    • 背压的作用是让消费者告诉生产者:“我处理不过来了,慢一点!”

2. 背压是自动的吗?

在 Reactor(Spring WebFlux 的底层库)中,背压是自动支持的

  • 当你使用 Flux 或 Mono 时,背压机制已经内置。

  • 消费者通过 Subscription.request(n) 告诉生产者需要多少数据。

Flux.range(1, 1000) // 生产者生成1到1000的数字.subscribe(new BaseSubscriber<Integer>() {@Overrideprotected void hookOnSubscribe(Subscription subscription) {// 每次请求10个数据request(10);}@Overrideprotected void hookOnNext(Integer value) {System.out.println("Received: " + value);// 每处理完一个数据,再请求1个request(1);}});
  • 这里消费者通过 request(n) 控制数据流的速度。

  • 一般subscribe方法内部都是不用自己重写的,只有在需要自定义的情况下才需要自己定义request(n),不然一般都是直接subscribe()。

3. 背压策略

(1)缓冲(Buffer):将未处理的数据缓存起来(默认策略)(onBackpressureBuffer())

  • Flux.range(1, 1000).onBackpressureBuffer(100) // 设置缓冲区大小为100.subscribe(n -> {Thread.sleep(10); // 模拟处理速度慢System.out.println(n);});

(2)丢弃(Drop):直接丢弃无法处理的数据onBackpressureDrop())

  • Flux.range(1, 1000).onBackpressureBuffer(100) // 设置缓冲区大小为100.subscribe(n -> {Thread.sleep(10); // 模拟处理速度慢System.out.println(n);});

(3)最新值(Latest):只保留最新的数据,丢弃旧数据onBackpressureLatest())

  • Flux.range(1, 1000).onBackpressureLatest() // 只保留最新数据.subscribe(n -> {Thread.sleep(10); // 模拟处理速度慢System.out.println(n);});

3.3.4 冷流和热流

1.冷流(Cold Stream)——像 DVD 播放器

冷流的特点

  • 每次订阅都从头开始:就像每次按播放键,DVD 都会从开头播放。

  • 数据独立:每个订阅者看到的数据是完整的、独立的。

实际场景:数据库查询

Flux<User> users = userRepository.findAll(); // 冷流// 第一次查询
users.subscribe(user -> System.out.println("用户1:" + user));// 第二次查询(重新执行SQL)
users.subscribe(user -> System.out.println("用户2:" + user));
  • 两次查询会执行两次 SQL,数据互不影响。


2.热流(Hot Stream)——像电视台直播

热流的特点

  • 数据实时广播:不管有没有人订阅,数据都在实时生成。

  • 共享数据源:所有订阅者看到的是同一份实时数据。

实际场景:股票价格推送

// 创建一个热流(每秒推送一个随机价格)
ConnectableFlux<Double> stockPrice = Flux.interval(Duration.ofSeconds(1)).map(i -> Math.random() * 100).publish(); // 转换为热流stockPrice.connect(); // 开始推送数据(即使没有订阅者)// 订阅者1(从当前时间点接收数据)
stockPrice.subscribe(price -> System.out.println("订阅者1:" + price));Thread.sleep(3000);// 订阅者2(3秒后加入,只能收到之后的数据)
stockPrice.subscribe(price -> System.out.println("订阅者2:" + price));

3.如何选择冷流 vs 热流?

场景选择原因
需要重复查询数据库冷流每次查询都要获取最新数据
实时聊天消息热流所有用户共享同一份实时消息
文件下载冷流每个下载请求独立处理
传感器数据监控热流数据持续生成,多个监控端共享

 3.3.5 自动订阅

定义

  • 自动订阅 是指框架(如 Spring WebFlux)在特定条件下自动调用 subscribe() 方法,触发数据流的执行。

  • 核心思想:开发者只需要定义数据流(如 Flux 或 Mono),框架会在合适的时机自动订阅并处理数据。

为什么需要自动订阅?

  • 简化代码:开发者不需要手动调用 subscribe(),减少样板代码。

  • 统一管理:框架可以更好地管理数据流的生命周期(如错误处理、背压控制等)。


Spring WebFlux 中的自动订阅

(1) HTTP 请求处理

  • 场景:当客户端发起 HTTP 请求时,Spring WebFlux 会自动订阅返回的 Flux 或 Mono

@RestController
public class UserController {@GetMapping("/users")public Flux<User> getUsers() {return userRepository.findAll(); // 返回一个 Flux<User>}
}
  • 解释

    • 当客户端访问 /users 时,Spring WebFlux 会自动调用 subscribe(),触发 userRepository.findAll() 的执行。

    • 数据流的结果会通过 HTTP 响应返回给客户端。

(2) WebSocket 通信

  • 场景:在 WebSocket 通信中,Spring WebFlux 会自动订阅数据流并推送给客户端。

@RestController
public class WebSocketController {@MessageMapping("chat")public Flux<String> chat(Flux<String> messages) {return messages.map(msg -> "Echo: " + msg); // 返回一个 Flux<String>}
}
  • 解释

    • 当客户端通过 WebSocket 发送消息时,Spring WebFlux 会自动订阅 messages 数据流。

    • 处理后的消息会通过 WebSocket 推送给客户端。

(3) 响应式数据库查询

  • 场景:在使用响应式数据库(如 R2DBC)时,Spring WebFlux 会自动订阅查询结果。

@Service
public class UserService {@Autowiredprivate UserRepository userRepository;public Flux<User> getAllUsers() {return userRepository.findAll(); // 返回一个 Flux<User>}
}
  • 解释

    • 当调用 getAllUsers() 时,Spring WebFlux 会自动订阅 userRepository.findAll()

    • 查询结果会通过数据流返回。


自动订阅的底层原理

(1) Spring WebFlux 的工作流程

  • 步骤 1:开发者定义数据流(如 Flux 或 Mono)。

  • 步骤 2:Spring WebFlux 在接收到请求时,自动调用 subscribe()

  • 步骤 3:数据流开始执行,结果通过 HTTP 响应或 WebSocket 返回。

(2) 自动订阅的触发条件

  • HTTP 请求:当控制器方法返回 Flux 或 Mono 时。

  • WebSocket 通信:当 @MessageMapping 方法返回 Flux 或 Mono 时。

  • 响应式数据库查询:当响应式仓库方法返回 Flux 或 Mono 时。

3.3.6 自定义 Netty 线程池

配置 EventLoopGroup 线程数

通过自定义 HttpServerResources,可以调整 EventLoopGroup 的线程数。

代码示例

import reactor.netty.http.HttpProtocol;
import reactor.netty.http.server.HttpServer;
import reactor.netty.resources.LoopResources;
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
import org.springframework.boot.web.server.WebServerFactoryCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;@Configuration
public class NettyConfig {@Beanpublic WebServerFactoryCustomizer<NettyReactiveWebServerFactory> nettyWebServerFactoryCustomizer() {return factory -> factory.addServerCustomizers(httpServer -> {// 自定义 EventLoopGroup 线程池LoopResources loopResources = LoopResources.create("custom-loop", 4, 8, true);// 自定义 Worker 线程池(处理业务逻辑)ThreadFactory workerThreadFactory = new ThreadFactory() {private final AtomicInteger counter = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "worker-thread-" + counter.incrementAndGet());}};return httpServer.protocol(HttpProtocol.HTTP11).runOn(loopResources) // 指定 EventLoopGroup.workerTaskExecutor(java.util.concurrent.Executors.newFixedThreadPool(16, workerThreadFactory));});}
}

参数说明

  • LoopResources.create("custom-loop", 4, 8, true)

    • 4:Boss Group 线程数(接收连接)。

    • 8:Worker Group 线程数(处理 I/O)。

    • true:是否使用守护线程。

  • workerTaskExecutor:自定义业务逻辑线程池,适合处理阻塞操作(如数据库访问)。


线程池选择策略

  • 纯非阻塞操作:使用默认的 EventLoop 线程(无需额外配置)。

  • 混合阻塞操作:使用 Schedulers.boundedElastic() 或自定义线程池隔离阻塞任务。

处理阻塞任务示例

import reactor.core.scheduler.Schedulers;public Mono<String> blockingOperation() {return Mono.fromCallable(() -> {// 模拟阻塞操作(如 JDBC 查询)Thread.sleep(1000);return "Blocking Result";}).subscribeOn(Schedulers.boundedElastic()); // 使用弹性线程池
}

4. Spring MVC转换成Spring WebFlux 

第一步:修改依赖

把 Spring MVC 的依赖换成 WebFlux:

<!-- 删除这个 -->
<!-- <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency> 
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency>
</dependencies>--><!-- 移除 spring-boot-starter-web,替换为 webflux --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><!-- 响应式数据库驱动(例如 R2DBC) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-r2dbc</artifactId></dependency><dependency><groupId>io.r2dbc</groupId><artifactId>r2dbc-postgresql</artifactId></dependency>

第二步: 修改 Controller层

  • Spring MVC直接返回 User 或 ResponseEntity<User>

  • WebFlux返回 Mono<ResponseEntity<User>> 或 Flux<ResponseEntity<User>>

Spring MVC 示例(阻塞式)

@GetMapping("/user/{id}")
public User getUser(@PathVariable String id) {return userService.findById(id); // 阻塞调用
}

Spring WebFlux 示例(非阻塞式)

@GetMapping("/user/{id}")
public Mono<User> getUser(@PathVariable String id) {return userService.findById(id); // 返回 Mono<User>
}

关键修改点

  • 所有返回值必须包装为 Mono<T> 或 Flux<T>

  • ResponseEntity 需要包裹在 Mono 中:Mono<ResponseEntity<User>>

  • 使用 map 转换结果,defaultIfEmpty 处理空值。


第三步:修改 Service 层

  • Spring MVC:方法返回 User(阻塞操作)。

  • WebFlux:方法返回 Mono<User> 或 Flux<User>(非阻塞操作)。

阻塞式服务示例

public User findById(String id) {return userRepository.findById(id); // 阻塞的 JPA 调用
}

非阻塞式服务示例

public Mono<User> findById(String id) {return userRepository.findById(id); // 假设已使用响应式 Repository
}

第四步:修改数据库访问

  • 如果用的是 MySQL/PostgreSQL → 换成 R2DBC(响应式驱动)。

  • 如果用的是 MongoDB → 直接用 响应式 MongoDB

Spring MVC(阻塞式)

使用 JPA 的阻塞式接口:

public interface UserRepository extends JpaRepository<User, String> {
}

Spring WebFlux(非阻塞式)

使用 R2DBC 或响应式 MongoDB:

// R2DBC(关系型数据库)
public interface ReactiveUserRepository extends ReactiveCrudRepository<User, String> {
}// 响应式 MongoDB
public interface ReactiveUserRepository extends ReactiveMongoRepository<User, String> {
}

第五步:处理阻塞代码

1. 处理返回 Mono 的阻塞代码

场景:调用一个阻塞的同步方法(如 JDBC 查询、传统 HTTP 客户端),返回单个结果。

关键操作

  • Mono.fromCallable():将阻塞操作包装为 Mono

  • subscribeOn(Schedulers.boundedElastic()):指定阻塞操作在弹性线程池中执行,,避免占用事件循环线程。

示例

public Mono<User> findUserBlocking(String id) {return Mono.fromCallable(() -> {// 这是一个阻塞的数据库查询(如 JDBC)return jdbcTemplate.queryForObject("SELECT * FROM users WHERE id = ?", User.class, id);}).subscribeOn(Schedulers.boundedElastic());
}

2. 处理返回 Flux 的阻塞代码

场景:调用一个阻塞的同步方法(如遍历文件内容、批量数据库查询),返回多个结果。

关键操作

  • Flux.generate() 或 Flux.create():生成数据流。

  • flatMapSequential 或 concatMap:保持数据顺序。

  • subscribeOn(Schedulers.boundedElastic()) 确保文件读取操作在弹性线程池中执行。

示例:遍历大文件并逐行处理

public Flux<String> readLargeFileBlocking(String filePath) {return Flux.generate(() -> Files.lines(Paths.get(filePath)).iterator(), (iterator, sink) -> {if (iterator.hasNext()) {sink.next(iterator.next());} else {sink.complete();}return iterator;}).subscribeOn(Schedulers.boundedElastic());
}

3. 通用原则

无论是 Mono 还是 Flux,处理阻塞代码的核心原则是相同的:

(1) 隔离阻塞操作

  • 使用 Schedulers.boundedElastic()

    • 专为阻塞操作设计的线程池,动态管理线程数量。

    • 避免使用 Schedulers.parallel()(用于 CPU 密集型任务)。

(2) 限制并发

  • 控制线程池大小:避免无限制创建线程。

    // 自定义线程池(可选)
    Scheduler blockingScheduler = Schedulers.newBoundedElastic(10,          // 最大线程数100,         // 任务队列容量"blocking-pool"
    );Mono.fromCallable(() -> blockingCall()).subscribeOn(blockingScheduler);

(3) 保持响应式链的纯净

  • 避免在 map/flatMap 中直接写阻塞代码

    // 错误示例:阻塞代码在事件循环中执行!
    Flux.range(1, 10).map(i -> {Thread.sleep(1000); // 阻塞操作return i * 2;});// 正确示例:隔离阻塞操作
    Flux.range(1, 10).flatMap(i -> Mono.fromCallable(() -> {Thread.sleep(1000); // 阻塞操作return i * 2;}).subscribeOn(Schedulers.boundedElastic()));

第六步:错误处理对比

Spring MVC(阻塞式)

使用 try-catch 或 @ExceptionHandler

@ExceptionHandler(Exception.class)
public ResponseEntity<String> handleError(Exception e) {return ResponseEntity.status(500).body("Error: " + e.getMessage());
}

Spring WebFlux(非阻塞式)

使用 Reactor 的错误处理操作符:

@GetMapping("/user/{id}")
public Mono<ResponseEntity<User>> getUser(@PathVariable String id) {return userService.findById(id).map(user -> ResponseEntity.ok(user)).onErrorResume(e -> Mono.just(ResponseEntity.status(500).build())); // 异常时返回500
}

常用操作符

  • onErrorReturn(T fallback):直接返回默认值。

  • onErrorResume(Function<Throwable, Mono<T>> fallback):返回另一个 Mono

  • onErrorMap(Function<Throwable, Throwable> mapper):转换异常类型。

第七步:集合数据处理对比

Spring MVC(阻塞式)

返回 List<User>

@GetMapping("/users")
public List<User> getAllUsers() {return userService.findAll(); // 阻塞操作
}

Spring WebFlux(非阻塞式)

返回 Flux<User>

@GetMapping("/users")
public Flux<User> getAllUsers() {return userService.findAll(); // 非阻塞操作
}

流式响应示例

@GetMapping(value = "/users-stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> getUsersStream() {return userService.findAll().delayElements(Duration.ofSeconds(1)); // 每秒发送一个用户
}

第八步:性能优化与注意事项

背压(Backpressure)管理

  • 默认策略:Reactor 会自动处理背压,但可以手动控制:

    Flux.range(1, 1000).onBackpressureBuffer(50) // 缓冲区大小为50.subscribe();
  • 策略选择

    • onBackpressureDrop():丢弃无法处理的数据。

    • onBackpressureLatest():只保留最新数据。

线程池配置

  • 非阻塞操作:使用默认的 Schedulers.parallel()

  • 阻塞操作:使用 Schedulers.boundedElastic()

第九步:测试代码对比

Spring MVC(阻塞式)

使用 MockMvc

@SpringBootTest
@AutoConfigureMockMvc
class UserControllerTest {@Autowiredprivate MockMvc mockMvc;@Testvoid testGetUser() throws Exception {mockMvc.perform(get("/user/1")).andExpect(status().isOk()).andExpect(jsonPath("$.name").value("Alice"));}
}

Spring WebFlux(非阻塞式)

使用 WebTestClient

@SpringBootTest
class UserControllerTest {@Autowiredprivate WebTestClient webTestClient;@Testvoid testGetUser() {webTestClient.get().uri("/user/1").exchange().expectStatus().isOk().expectBody().jsonPath("$.name").isEqualTo("Alice");}
}

5. WebClient 在 Spring WebFlux 中的使用

5.1. WebClient 的核心作用

WebClient 是 Spring WebFlux 提供的 非阻塞式 HTTP 客户端,用于替代传统的 RestTemplate

  • 非阻塞特性:基于 Reactor Netty,所有操作通过事件循环(Event Loop)异步执行,无需阻塞线程等待响应。

  • 响应式流支持:直接返回 Mono(单个结果)或 Flux(多个结果),与 Spring WebFlux 的响应式编程模型无缝集成。

  • 背压支持:自动控制数据流速率,避免生产者压垮消费者。

为什么需要从 RestTemplate 迁移到 WebClient?

  • 传统问题(Spring MVC + RestTemplate)

    • 阻塞式调用:每个 HTTP 请求占用一个线程,线程池资源有限(如 Tomcat 默认 200 线程),高并发时容易耗尽线程。

    • 同步处理:代码必须等待 HTTP 响应返回后才能继续执行,无法高效利用 CPU 和网络资源。

  • 解决方案(Spring WebFlux + WebClient)

    • 非阻塞 I/O:基于 Reactor Netty,使用事件循环模型,少量线程处理大量并发请求。

    • 响应式流整合:直接返回 Mono/Flux,与响应式服务无缝衔接。

    • 资源高效:无需为每个请求分配线程,适合微服务和云原生架构。


5.2. 创建与配置 WebClient

(1) 基础配置

// 默认配置(无基础 URL)
WebClient webClient = WebClient.create();// 自定义配置(推荐)
WebClient webClient = WebClient.builder().baseUrl("https://api.example.com")  // 设置基础 URL(后续请求可省略域名).defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) // 默认请求头.filter((request, next) -> {        // 全局过滤器(如日志、认证)System.out.println("Request URL: " + request.url());return next.exchange(request);}).build();

详细说明

  • baseUrl:所有请求的前缀,例如 /users 实际访问 https://api.example.com/users

  • defaultHeader:为所有请求添加默认请求头(如设置 Content-Type)。

  • filter:拦截请求和响应,可用于日志记录、添加认证令牌等。

(2) 配置超时和连接池

HttpClient httpClient = HttpClient.create().responseTimeout(Duration.ofSeconds(5))  // 响应超时时间.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000); // 连接超时时间(3秒)WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)) // 绑定 HttpClient.build();

参数说明

  • responseTimeout:等待服务器响应的最大时间,超时后抛出 ReadTimeoutException

  • CONNECT_TIMEOUT_MILLIS:建立 TCP 连接的超时时间。


5.3. 发起 HTTP 请求

(1) GET 请求:获取单个资源

Mono<User> userMono = webClient.get().uri("/users/{id}", 1)              // 路径参数.retrieve()                         // 发起请求并获取响应.bodyToMono(User.class);            // 将响应体解析为 User 对象// 调用示例
userMono.subscribe(user -> System.out.println("User: " + user),error -> System.err.println("Error: " + error)
);

详细说明

  • uri("/users/{id}", 1):路径参数占位符,{id} 会被替换为 1

  • retrieve():执行请求并获取响应。

  • bodyToMono(User.class):将响应体解析为 User 对象,返回 Mono<User>

(2) GET 请求:获取资源集合

Flux<User> usersFlux = webClient.get().uri("/users").retrieve().bodyToFlux(User.class); // 解析为 Flux<User>// 调用示例
usersFlux.subscribe(user -> System.out.println("Received user: " + user),error -> System.err.println("Error: " + error),() -> System.out.println("All users received")
);

关键点

  • bodyToFlux:用于解析多个对象的场景(如返回 JSON 数组)。

(3) POST 请求:提交数据

User newUser = new User("Alice");
Mono<User> createdUser = webClient.post().uri("/users").bodyValue(newUser)                // 设置请求体.retrieve().bodyToMono(User.class);           // 解析响应体// 调用示例
createdUser.subscribe(user -> System.out.println("Created user: " + user),error -> System.err.println("Error: " + error)
);

详细说明

  • bodyValue(newUser):将对象序列化为 JSON 并作为请求体发送。

  • 默认使用 Jackson 库进行 JSON 序列化/反序列化。

(4) PUT 请求:更新资源

User updatedUser = new User("Alice Updated");
Mono<Void> updateResult = webClient.put().uri("/users/{id}", 1).bodyValue(updatedUser).retrieve().bodyToMono(Void.class);           // 无响应体时使用updateResult.subscribe(() -> System.out.println("User updated"),error -> System.err.println("Error: " + error)
);

(5) DELETE 请求:删除资源

Mono<Void> deleteResult = webClient.delete().uri("/users/{id}", 1).retrieve().bodyToMono(Void.class);deleteResult.subscribe(() -> System.out.println("User deleted"),error -> System.err.println("Error: " + error)
);

(6)如何传递请求头?
使用 .headers() 方法添加自定义头:

webClient.get().uri("/users/{userId}", userId).header("X-Auth-Token", "my-token").retrieve().bodyToMono(User.class);

5.4. 处理响应与错误

(1) 处理 HTTP 状态码

Mono<User> userMono = webClient.get().uri("/users/{id}", 1).exchangeToMono(response -> {      // 手动处理完整响应if (response.statusCode().is2xxSuccessful()) {return response.bodyToMono(User.class);} else if (response.statusCode() == HttpStatus.NOT_FOUND) {return Mono.error(new UserNotFoundException("User not found"));} else {return Mono.error(new RuntimeException("Server error"));}});

关键点

  • exchangeToMono:获取完整的响应对象(包含状态码、头信息),适合需要精细控制的场景。

  • onStatus() 简化错误处理

    webClient.get().uri("/users/{id}", 1).retrieve().onStatus(HttpStatusCode::is4xxClientError, response ->Mono.error(new ClientErrorException())).onStatus(HttpStatusCode::is5xxServerError, response ->Mono.error(new ServerErrorException())).bodyToMono(User.class);

(2) 重试机制

webClient.get().uri("/users/{id}", 1).retrieve().bodyToMono(User.class).retryWhen(Retry.backoff(3, Duration.ofSeconds(1))); // 指数退避重试

参数说明

  • maxAttempts(3):最多重试 3 次。

  • backoff(Duration.ofSeconds(1)):每次重试间隔递增(1s, 2s, 4s)。

(3) 超时控制

webClient.get().uri("/users/{id}", 1).retrieve().bodyToMono(User.class).timeout(Duration.ofSeconds(5))    // 设置超时时间.onErrorResume(TimeoutException.class, e ->Mono.error(new ServiceUnavailableException()));

5.5. 复杂场景处理

(1) 文件上传

MultipartBodyBuilder builder = new MultipartBodyBuilder();
builder.part("file", new FileSystemResource("test.txt")); // 添加文件
builder.part("name", "My File");                          // 添加表单字段Mono<String> result = webClient.post().uri("/upload").body(BodyInserters.fromMultipartData(builder.build())) // 构建 Multipart 请求.retrieve().bodyToMono(String.class);

(2) 流式响应处理(Server-Sent Events)

Flux<String> eventStream = webClient.get().uri("/events").accept(MediaType.TEXT_EVENT_STREAM) // 声明接收 SSE 流.retrieve().bodyToFlux(String.class);eventStream.subscribe(event -> System.out.println("Received event: " + event),error -> System.err.println("Error: " + error),() -> System.out.println("Stream completed")
);

5.6. 异步流整合

(1) 链式调用多个请求

Mono<Order> orderMono = webClient.get().uri("/users/{id}", 1).retrieve().bodyToMono(User.class).flatMap(user -> webClient.post().uri("/orders").bodyValue(new Order(user.getId())).retrieve().bodyToMono(Order.class));

关键点

  • flatMap:将前一个 Mono 的结果传递给下一个异步操作。

(2) 并行调用多个请求

Mono<User> userMono = webClient.get().uri("/users/1").retrieve().bodyToMono(User.class);
Mono<Order> orderMono = webClient.get().uri("/orders/1").retrieve().bodyToMono(Order.class);// 合并结果
Mono<UserOrder> userOrderMono = Mono.zip(userMono, orderMono).map(tuple -> new UserOrder(tuple.getT1(), tuple.getT2()));

说明

  • Mono.zip:并行执行多个请求,所有结果就绪后合并。


5.7. 总结

步骤Spring MVC (RestTemplate)Spring WebFlux (WebClient)
发起请求restTemplate.getForObject()webClient.get().retrieve().bodyToMono()
错误处理try-catch.onStatus() + .onErrorResume()
超时控制手动设置线程超时.timeout(Duration)
文件上传MultiValueMap + postForEntity()MultipartBodyBuilder + BodyInserters
流式处理不支持支持 Server-Sent Events (SSE)

6. Spring WebFlux 文件上传与下载详细指南

6.1. 文件上传

6.1.1 添加依赖

确保项目中包含 spring-boot-starter-webflux

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

6.1.2 配置上传限制

在 application.yml 中配置最大文件大小和请求大小:

spring:webflux:multipart:max-file-size: 10MB      # 单个文件最大大小max-request-size: 20MB   # 整个请求最大大小

6.1.3 接收单个文件上传

import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.codec.multipart.FilePart;
import reactor.core.publisher.Mono;@PostMapping("/upload")
public Mono<String> uploadFile(@RequestPart("file") FilePart filePart) {// 获取文件名(需防范路径遍历攻击)String fileName = filePart.filename().replaceAll("[^a-zA-Z0-9.-]", "_");Path path = Paths.get("uploads", fileName);// 将文件内容写入磁盘(非阻塞方式)return DataBufferUtils.write(filePart.content(), path, StandardOpenOption.CREATE).then(Mono.just("File uploaded: " + fileName));
}

关键点

  • @RequestPart("file"):绑定上传的文件参数。

  • 文件名清洗:替换非法字符,防止路径遍历攻击。

  • DataBufferUtils.write:非阻塞写入文件。

单个文件上传:DataBufferUtils.write 参数详解

方法签名

public static Mono<Void> write(Publisher<DataBuffer> source, // 输入的数据流(通常是 FilePart.content())Path destination,             // 目标文件路径OpenOption... options         // 文件打开选项(如 CREATE、WRITE)
)

参数解释

  • source: Publisher<DataBuffer>

    • 数据来源,通常为 FilePart.content(),表示上传文件的二进制数据流。

    • FilePart 是 Spring 对上传文件的抽象,content() 方法返回 Flux<DataBuffer>

  • destination: Path

    • 文件保存路径,需使用 Paths.get 创建。

    • 示例:Path path = Paths.get("uploads", fileName);

  • options: OpenOption...

    • 控制文件写入方式的选项,常用值:

      • StandardOpenOption.CREATE:如果文件不存在则创建。

      • StandardOpenOption.TRUNCATE_EXISTING:如果文件存在则清空。

      • StandardOpenOption.WRITE:允许写入。

6.1.4 接收多个文件上传

@PostMapping("/upload-multi")
public Mono<String> uploadMultipleFiles(@RequestPart("files") List<FilePart> fileParts) {return Flux.fromIterable(fileParts).flatMap(filePart -> {String fileName = sanitizeFilename(filePart.filename());Path path = Paths.get("uploads", fileName);return DataBufferUtils.write(filePart.content(), path, StandardOpenOption.CREATE);}).then(Mono.just("All files uploaded"));
}private String sanitizeFilename(String filename) {return filename.replaceAll("[^a-zA-Z0-9.-]", "_");
}

6.1.5 处理大文件(分块上传与进度监控)

分块上传代码

@PostMapping(value = "/upload-large", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public Mono<String> uploadLargeFile(@RequestBody Flux<PartEvent> partEvents) {return partEvents// 按文件分块(直到遇到最后一个块).windowUntil(PartEvent::isLast) // 按顺序处理每个块.concatMap(partEventFlux -> partEventFlux// 过滤出文件数据块.ofType(FilePartEvent.class)// 提取数据内容.map(FilePartEvent::content)// 合并所有 DataBuffer 为单个缓冲区.as(content -> DataBufferUtils.join(content)))// 处理合并后的数据块.flatMap(dataBuffer -> {return Mono.fromRunnable(() -> {try (InputStream is = dataBuffer.asInputStream(true)) {// 将数据追加写入文件Files.copy(is, Paths.get("uploads/large-file.dat"), StandardCopyOption.REPLACE_EXISTING);} catch (IOException e) {throw new RuntimeException(e);} finally {// 释放缓冲区内存DataBufferUtils.release(dataBuffer);}});}).then(Mono.just("Large file uploaded"));
}

关键概念

  • PartEvent:Spring 5.3+ 新增的 API,表示多部分请求中的每个事件(如文件头、数据块、请求结束)。
  • windowUntil:将数据流按条件分块(此处按 isLast 判断是否结束)。
  • concatMap:保证块按顺序处理。
  • DataBufferUtils.join:将多个 DataBuffer 合并为一个。

这里的后端代码是基于 Spring WebFlux 的流式分块接收PartEvent),其设计初衷是配合支持 HTTP/2 流式上传的前端框架(如浏览器中的 Fetch API + Streams API)。但微信小程序的网络请求 API 存在以下限制

  • 不支持流式上传:小程序 wx.request 和 wx.uploadFile API 只能发送完整请求体,无法直接发送 Flux<DataBuffer> 流。

  • 无法触发 PartEvent 分块逻辑:后端 windowUntil(PartEvent::isLast) 依赖前端按流式分块发送数据,而小程序只能发送完整的 multipart/form-data 请求。

问题点原 Spring WebFlux 接口适配小程序方案
分块触发方式依赖流式 PartEvent 自动分块前端手动切割文件并多次调用上传接口
请求类型单一请求持续发送流式数据多个独立请求
后端存储逻辑流式写入同一文件按分块编号存储临时文件
断点续传支持需自行实现通过记录已上传分块实现

添加进度监控

AtomicLong bytesWritten = new AtomicLong(0);.flatMap(dataBuffer -> {long currentBytes = dataBuffer.readableByteCount();bytesWritten.addAndGet(currentBytes);System.out.println("Progress: " + bytesWritten.get() + " bytes");// ...原有写入逻辑...
})

6.2. 文件下载

6.2.1 下载静态文件

@GetMapping("/download/{filename:.+}")
public Mono<Void> downloadFile(@PathVariable String filename, ServerHttpResponse response
) {// 1. 构建文件路径Path filePath = Paths.get("uploads", filename);Resource resource = new UrlResource(filePath.toUri());// 2. 检查文件是否存在if (!resource.exists() || !resource.isReadable()) {return Mono.error(new FileNotFoundException("File not found"));}// 3. 设置响应头response.getHeaders().setContentType(MediaType.APPLICATION_OCTET_STREAM);response.getHeaders().setContentDisposition(ContentDisposition.attachment().filename(filename).build());// 4. 零拷贝写入响应ZeroCopyHttpOutputMessage zeroCopyResponse = (ZeroCopyHttpOutputMessage) response;return zeroCopyResponse.writeWith(resource, 0, resource.contentLength());
}

浏览器行为

  • Content-Type: application/octet-stream:告诉浏览器这是一个二进制文件,无法直接预览。

  • Content-Disposition: attachment; filename="file.txt":强制浏览器弹出下载对话框。

  • 这是 HTTP 协议的标准行为,所有遵循协议的客户端(包括 Postman)都会根据这些头处理响应。

非浏览器客户端(如微信小程序)

  • 微信小程序不会自动触发下载,需要开发者手动处理:

    • 小程序前端通过 wx.downloadFile API 下载文件。

    • 开发者需将文件 URL 返回给前端,前端根据业务逻辑自行处理文件(如保存到本地、预览等)。

  • 关键点:无论客户端类型,设置这些头是服务端的责任,但实际下载行为由客户端实现。

示例代码(小程序端)

wx.downloadFile({url: 'https://your-api.com/download/file.txt',success(res) {wx.saveFile({tempFilePath: res.tempFilePath,success(savedRes) {console.log('文件保存成功:', savedRes.savedFilePath);}});}
});

为什么下载可以使用零拷贝(从云存储中下载不行),而上传不行?

  • 下载场景

    • 零拷贝机制:下载时,文件内容需要从磁盘读取并通过网络发送到客户端。零拷贝技术(如 Linux 的 sendfile)允许数据直接从磁盘文件描述符传输到网络套接字,绕过用户空间的内存复制,显著减少 CPU 和内存开销。

    • 适用性:适用于服务器主动发送静态文件的场景。

  • 上传场景

    • 数据流向:上传时,客户端将文件数据通过 HTTP 请求发送到服务器,服务器需要接收数据并处理(如保存到磁盘或云存储)。

    • 技术限制:上传的数据来自网络流,必须经过内核缓冲区 → 用户空间 → 磁盘/云存储的路径,无法绕过用户空间直接写入目标位置。

    • 结论:上传无法直接使用零拷贝技术,但可通过流式处理(Flux<DataBuffer>)减少内存占用。

  • 若文件存储在非本地磁盘(如云存储),需直接返回 URL,无法使用零拷贝。

6.2.2 动态生成文件并下载

@GetMapping("/generate-csv")
public Mono<Void> generateCsv(ServerHttpResponse response) {// 1. 设置响应头response.getHeaders().setContentType(MediaType.TEXT_PLAIN);response.getHeaders().setContentDisposition(ContentDisposition.attachment().filename("data.csv").build());// 2. 生成 CSV 数据流Flux<String> csvLines = Flux.just("Name,Age", "Alice,30", "Bob,25");// 3. 将字符串转换为字节缓冲区并写入响应return response.writeWith(csvLines.map(line -> response.bufferFactory().wrap(line.getBytes(StandardCharsets.UTF_8))));
}
  • 设置响应头,指定文件名和内容类型。

  • 创建包含 CSV 行的 Flux

  • 将每行字符串转换为 DataBuffer 并写入响应。

6.2.3 断点续传(Range 请求支持)

作用

  • 允许客户端在下载中断后,从断点继续下载,避免重新传输整个文件。

  • 节省带宽和时间,提升大文件下载体验。

流程

  • 客户端发起请求:通过 Range 头指定需要下载的字节范围(如 Range: bytes=0-999)。

  • 服务端响应

    • 检查文件是否存在,解析 Range 头。

    • 返回状态码 206 Partial Content 和 Content-Range 头(如 Content-Range: bytes 0-999/2000)。

    • 仅传输指定范围的字节。

  • 客户端处理

    • 将接收到的数据追加到本地临时文件。

    • 如果下载中断,记录已下载的字节位置,下次请求时从该位置继续。

@GetMapping("/download-large/{filename:.+}")
public Mono<Void> downloadLargeFile(@PathVariable String filename, ServerHttpRequest request, ServerHttpResponse response) {Path filePath = Paths.get("uploads", filename);Resource resource = new UrlResource(filePath.toUri());if (!resource.exists() || !resource.isReadable()) {return Mono.error(new FileNotFoundException("File not found: " + filename));}// 解析 Range 头List<HttpRange> ranges = request.getHeaders().getRange();long fileSize = resource.contentLength();if (ranges.isEmpty()) {// 完整下载response.getHeaders().setContentLength(fileSize);return response.writeWith(DataBufferUtils.read(resource, response.bufferFactory(), 4096));} else {// 处理分块请求HttpRange range = ranges.get(0);long start = range.getRangeStart(fileSize);long end = range.getRangeEnd(fileSize);long length = end - start + 1;response.getHeaders().setContentType(MediaType.APPLICATION_OCTET_STREAM);response.getHeaders().setContentLength(length);response.getHeaders().set(HttpHeaders.CONTENT_RANGE, "bytes " + start + "-" + end + "/" + fileSize);response.setStatusCode(HttpStatus.PARTIAL_CONTENT);return response.writeWith(DataBufferUtils.read(resource, response.bufferFactory(), 4096, start));}
}

6.3. 错误处理

6.3.1 全局异常处理

@ControllerAdvice
public class FileExceptionHandler {@ExceptionHandler(FileNotFoundException.class)public ResponseEntity<Mono<String>> handleFileNotFound(FileNotFoundException ex) {return ResponseEntity.status(HttpStatus.NOT_FOUND).body(Mono.just(ex.getMessage()));}@ExceptionHandler(DataBufferLimitException.class)public ResponseEntity<Mono<String>> handleSizeExceeded(DataBufferLimitException ex) {return ResponseEntity.status(HttpStatus.PAYLOAD_TOO_LARGE).body(Mono.just("File size exceeds limit"));}
}

6.3.2 上传文件大小限制错误

在 application.yml 中配置错误页面(可选):

spring:webflux:multipart:max-file-size: 10MBmax-request-size: 20MB

如果文件超过 max-file-size,Spring 会抛出 DataBufferLimitException

如果请求总大小超过 max-request-size,抛出 MaxUploadSizeExceededException

处理方式

  • 全局异常处理

    @ExceptionHandler(DataBufferLimitException.class)
    public ResponseEntity<String> handleFileSizeExceeded() {return ResponseEntity.status(HttpStatus.PAYLOAD_TOO_LARGE).body("File size exceeds limit");
    }
  • 前端提示:返回 HTTP 413 状态码,提示用户压缩文件或分块上传。


6.4. 安全与优化

6.4.1 文件名安全处理

private String sanitizeFilename(String filename) {// 移除路径信息String safeName = filename.replaceAll("^.*[\\\\/]", "");// 替换非法字符return safeName.replaceAll("[^a-zA-Z0-9.-]", "_");
}

6.4.2 病毒扫描(集成 ClamAV)

ClamAV 的作用

  • 目的:检测上传文件是否包含病毒或恶意软件。

  • 原理:ClamAV 是一个开源杀毒引擎,通过 TCP 连接调用其守护进程(clamd)进行扫描。

// 1. 添加 ClamAV 客户端依赖
<dependency><groupId>com.github.clam</groupId><artifactId>clam-client</artifactId><version>1.0.0</version>
</dependency>// 2. 扫描服务类
public class ClamAvService {private final ClamAVClient clamAVClient;public ClamAvService() {this.clamAVClient = new ClamAVClient("localhost", 3310);}public Mono<Boolean> scanFile(Path filePath) {return Mono.fromCallable(() -> {byte[] reply = clamAVClient.scan(filePath);return ClamAVClient.isCleanReply(reply);}).subscribeOn(Schedulers.boundedElastic()); // 在阻塞线程池执行}
}// 3. 在控制器中使用
@PostMapping("/safe-upload")
public Mono<String> safeUpload(@RequestPart("file") FilePart filePart) {Path path = Paths.get("uploads", sanitizeFilename(filePart.filename()));return DataBufferUtils.write(filePart.content(), path).then(clamAvService.scanFile(path)).flatMap(isClean -> {if (!isClean) {Files.delete(path); // 删除染毒文件return Mono.error(new VirusFoundException());}return Mono.just("File is safe");});
}

6.4.3 异步存储到云服务(如 COS)

@Service
public class CosService {@Autowiredprivate COSClient cosClient;@Value("${tencent.cos.bucket}")private String bucketName;public Mono<String> uploadToCos(FilePart filePart) {String fileName = sanitizeFilename(filePart.filename());String cosKey = "uploads/" + fileName;// 将文件内容转换为字节流Flux<DataBuffer> contentFlux = filePart.content();Flux<byte[]> bytesFlux = contentFlux.map(dataBuffer -> {byte[] bytes = new byte[dataBuffer.readableByteCount()];dataBuffer.read(bytes);DataBufferUtils.release(dataBuffer);return bytes;});// 异步上传return Mono.create(sink -> {TransferManager transferManager = new TransferManager(cosClient);Upload upload = transferManager.upload(bucketName, cosKey, Channels.newChannel(new FluxInputStream(bytesFlux)), // 自定义输入流new ObjectMetadata());upload.addProgressListener((ProgressListener) progressEvent -> {System.out.println("Upload progress: " + progressEvent.getBytesTransferred());});try {upload.waitForUploadResult();sink.success("https://" + bucketName + ".cos." + region + ".myqcloud.com/" + cosKey);} catch (InterruptedException e) {sink.error(e);}}).subscribeOn(Schedulers.boundedElastic());}
}// 自定义 Flux 转 InputStream
class FluxInputStream extends InputStream {private final Iterator<byte[]> iterator;private ByteArrayInputStream currentStream;public FluxInputStream(Flux<byte[]> flux) {this.iterator = flux.toIterable().iterator();}@Overridepublic int read() throws IOException {if (currentStream == null || currentStream.available() == 0) {if (!iterator.hasNext()) return -1;currentStream = new ByteArrayInputStream(iterator.next());}return currentStream.read();}
}
  • 将上传文件的 Flux<DataBuffer> 转换为 Flux<byte[]>

  • 通过 TransferManager 异步上传到 COS。

  • 添加进度监听器。

  • 返回上传后的文件 URL。

@PostMapping("/upload-to-cos")
public Mono<String> uploadToCos(@RequestPart("file") FilePart filePart) {return cosService.uploadToCos(filePart).onErrorResume(e -> Mono.just("Upload failed: " + e.getMessage()));
}

6.5. 测试文件上传与下载

6.5.1 上传测试

@SpringBootTest
@AutoConfigureWebTestClient
class FileControllerTest {@Autowiredprivate WebTestClient webTestClient;@Testvoid testFileUpload() {Resource file = new FileSystemResource("test.txt");webTestClient.post().uri("/upload").contentType(MediaType.MULTIPART_FORM_DATA).bodyValue(generateMultipartBody(file)).exchange().expectStatus().isOk().expectBody(String.class).isEqualTo("File uploaded: test.txt");}private MultiValueMap<String, HttpEntity<?>> generateMultipartBody(Resource file) {MultipartBodyBuilder builder = new MultipartBodyBuilder();builder.part("file", file).filename("test.txt");return builder.build();}
}

6.5.2 下载测试

@Test
void testFileDownload() throws IOException {Path testFile = Files.write(Paths.get("uploads/test.txt"), "Hello World".getBytes());webTestClient.get().uri("/download/test.txt").exchange().expectStatus().isOk().expectHeader().contentType(MediaType.APPLICATION_OCTET_STREAM).expectHeader().contentDisposition(ContentDisposition.attachment().filename("test.txt").build()).expectBody(String.class).isEqualTo("Hello World");Files.delete(testFile);
}

6.6. 性能优化

6.6.1 调整缓冲区大小

在 application.yml 中配置:

spring:webflux:max-in-memory-size: 1MB      # 内存缓冲区大小(超过部分写入磁盘)
  • 内存缓冲区大小:控制文件上传或请求体解析时,内存中缓存的最大数据量

  • 设计目的:防止大文件上传占用过多内存,导致应用 OOM(内存溢出)。

超限处理流程

  • 小文件上传(<1MB)

    • 整个文件内容缓存在内存中,快速处理。

  • 大文件上传(>1MB)

    • 内存部分:前 1MB 数据保留在内存缓冲区。

    • 超出部分:自动写入磁盘临时文件(路径由 java.io.tmpdir 指定)。

    • 最终处理:所有数据块(内存 + 磁盘临时文件)合并后传递给业务代码。

代码示例与验证

@PostMapping("/upload")
public Mono<String> upload(@RequestPart("file") FilePart filePart) {// 即使上传 100MB 文件,内存占用不会超过 1MBreturn DataBufferUtils.write(filePart.content(), Paths.get("uploads/file.dat")).then(Mono.just("Upload success"));
}

注意事项

  • 临时文件清理:Spring 不会自动删除临时文件,需配置定时任务或钩子清理。

  • 性能影响:频繁的磁盘 I/O 可能降低性能,建议根据服务器内存调整此值(如设置为 10MB)。

6.6.2 分块传输编码

对于大文件下载,自动启用分块传输:

response.getHeaders().set(HttpHeaders.TRANSFER_ENCODING, "chunked");

6.7. 完整配置示例

6.7.1 文件存储目录自动创建

@PostConstruct
public void init() throws IOException {Path uploadDir = Paths.get("uploads");if (!Files.exists(uploadDir)) {Files.createDirectories(uploadDir);}
}

6.7.2 清理临时文件

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.nio.file.*;@Configuration
public class TempFileCleanup {private Path uploadDir = Paths.get("uploads");// 启动时创建目录@PostConstructpublic void init() throws IOException {if (!Files.exists(uploadDir)) {Files.createDirectories(uploadDir);}}// 关闭时清理文件@PreDestroypublic void cleanup() throws IOException {Files.walk(uploadDir).filter(Files::isRegularFile).forEach(path -> {try {Files.delete(path);System.out.println("Deleted: " + path);} catch (IOException e) {System.err.println("Failed to delete: " + path);}});}
}
  • 生产环境:建议使用定时任务清理过期文件(如 Quartz 或 Spring Scheduler)。


6.8 总结

功能实现方案
文件上传FilePart + DataBufferUtils.write
文件下载Resource + 零拷贝写入
大文件处理分块上传(PartEvent)、断点续传(Range 头)
安全性文件名清洗、病毒扫描
云存储集成腾讯云(COS)
性能优化零拷贝、分块传输、堆外内存配置
测试WebTestClient 模拟多部分请求

7. WebSocket 集成

7.1. 项目初始化与依赖配置

创建 Spring Boot 项目

使用 Spring Initializr 生成项目,选择以下依赖:

  • Spring Reactive Web(包含 WebFlux)

  • Lombok(简化代码,可选)

添加必要依赖

确保 pom.xml 包含 WebFlux 依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

7.2. 配置 WebSocket 路由

定义路由映射

创建配置类 WebSocketConfig.java,将 WebSocket 路径映射到自定义的 WebSocketHandler

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
import java.util.HashMap;
import java.util.Map;@Configuration
public class WebSocketConfig {@Beanpublic HandlerMapping handlerMapping(WebSocketHandler webSocketHandler) {Map<String, WebSocketHandler> pathMap = new HashMap<>();pathMap.put("/ws/chat", webSocketHandler); // 绑定路径到处理器SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();mapping.setUrlMap(pathMap);mapping.setOrder(-1); // 设置最高优先级return mapping;}@Beanpublic WebSocketHandlerAdapter handlerAdapter() {return new WebSocketHandlerAdapter();}
}
  • 关键点

    • SimpleUrlHandlerMapping 将 URL 路径 /ws/chat 映射到自定义的 WebSocketHandler

    • WebSocketHandlerAdapter 是 Spring WebFlux 处理 WebSocket 的核心适配器。


7.3. 实现 WebSocketHandler

自定义处理器

创建 ChatWebSocketHandler.java,实现 WebSocketHandler 接口,管理会话和处理消息:

import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;@Component
public class ChatWebSocketHandler implements WebSocketHandler {// 使用线程安全的集合存储所有活跃会话private final List<WebSocketSession> sessions = new CopyOnWriteArrayList<>();@Overridepublic Mono<Void> handle(WebSocketSession session) {// 1. 新连接加入会话池sessions.add(session);System.out.println("[连接建立] 会话ID: " + session.getId());// 2. 处理客户端发送的消息流return session.receive() // 返回 Flux<WebSocketMessage>.flatMap(message -> processMessage(message, session)).doFinally(signal -> {// 3. 连接关闭时清理会话sessions.remove(session);System.out.println("[连接关闭] 会话ID: " + session.getId());}).then(); // 返回 Mono<Void> 表示处理完成}// 处理单条消息private Mono<Void> processMessage(WebSocketMessage message, WebSocketSession sender) {if (message.getType() == WebSocketMessage.Type.TEXT) {String payload = message.getPayloadAsText();System.out.println("[收到消息] 来自 " + sender.getId() + ": " + payload);return broadcast(payload, sender); // 广播消息}return Mono.empty(); // 忽略非文本消息}// 广播消息给所有其他用户private Mono<Void> broadcast(String message, WebSocketSession sender) {return Flux.fromIterable(sessions).filter(session -> !session.equals(sender)) // 排除发送者.flatMap(session -> session.send(Mono.just(session.textMessage(formatMessage(message, sender))))).then(); // 合并所有发送操作为一个 Mono<Void>}// 格式化消息(可自定义协议)private String formatMessage(String message, WebSocketSession sender) {return String.format("[用户 %s]: %s", sender.getId(), message);}
}

代码逻辑详解

  • 会话管理

    • sessions 使用 CopyOnWriteArrayList,保证线程安全(读多写少场景)。

    • 新连接建立时,sessions.add(session) 将会话加入列表。

    • 连接关闭时,doFinally 回调中移除会话,防止内存泄漏。

  • 消息处理

    • session.receive() 返回 Flux<WebSocketMessage>,表示客户端发送的消息流。

    • flatMap 对每条消息进行处理,支持非阻塞操作。

    • processMessage 处理文本消息,忽略二进制等其他类型消息。

  • 广播机制

    • Flux.fromIterable(sessions) 遍历所有会话。

    • filter 排除发送者,避免消息回传。

    • session.send(...) 发送消息,Mono.just(...) 创建文本消息。

  • 响应式流控制

    • flatMap 将每条消息转换为广播操作流。

    • then() 将多个异步操作合并为 Mono<Void>,表示整体完成。


7.4. 高级扩展功能

握手阶段鉴权

在握手阶段验证 Token 或用户身份:

import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;public class CustomHandshakeHandler extends HandshakeWebSocketService {public CustomHandshakeHandler(RequestUpgradeStrategy upgradeStrategy) {super(upgradeStrategy);}@Overridepublic Mono<Void> handleRequest(ServerWebExchange exchange, WebSocketHandler handler) {// 从请求头获取 TokenHttpHeaders headers = exchange.getRequest().getHeaders();String token = headers.getFirst("Authorization");if (!isValidToken(token)) {return Mono.error(new SecurityException("无效的 Token"));}// 继续处理握手return super.handleRequest(exchange, handler);}private boolean isValidToken(String token) {// 实现 Token 验证逻辑return true;}
}

在配置类中替换默认的 WebSocketService

@Bean
public WebSocketService webSocketService() {return new CustomHandshakeHandler(new ReactorNettyRequestUpgradeStrategy());
}

心跳检测

配置底层 Netty 心跳机制:

import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
import org.springframework.boot.web.server.WebServerFactoryCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.netty.http.server.HttpServer;@Configuration
public class NettyConfig {@Beanpublic WebServerFactoryCustomizer<NettyReactiveWebServerFactory> webServerFactoryCustomizer() {return factory -> factory.addServerCustomizers(httpServer -> httpServer.tcpConfiguration(tcpServer ->tcpServer.option(ChannelOption.SO_KEEPALIVE, true)));}
}

消息协议设计

使用 JSON 格式传递结构化数据:

import com.fasterxml.jackson.databind.ObjectMapper;// 消息实体类
@Data // Lombok 注解
@AllArgsConstructor
@NoArgsConstructor
public class ChatMessage {private String sender;private String content;private long timestamp;
}// 修改广播方法
private Mono<Void> broadcast(String rawMessage, WebSocketSession sender) {ObjectMapper mapper = new ObjectMapper();try {ChatMessage message = new ChatMessage(sender.getId(),rawMessage,System.currentTimeMillis());String json = mapper.writeValueAsString(message);return Flux.fromIterable(sessions).filter(session -> !session.equals(sender)).flatMap(session -> session.send(Mono.just(session.textMessage(json)))).then();} catch (Exception e) {return Mono.error(e);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/893704.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue2下篇

插槽&#xff1a; 基本插槽&#xff1a; 普通插槽&#xff1a;父组件向子组件传递静态内容。基本插槽只能有一个slot标签&#xff0c;因为这个是默认的位置&#xff0c;所以只能有一个 <!-- ParentComponent.vue --> <template> <ChildComponent> <p>…

【科研建模】Pycaret自动机器学习框架使用流程及多分类项目实战案例详解

Pycaret自动机器学习框架使用流程及项目实战案例详解 1 Pycaret介绍2 安装及版本需求3 Pycaret自动机器学习框架使用流程3.1 Setup3.2 Compare Models3.3 Analyze Model3.4 Prediction3.5 Save Model4 多分类项目实战案例详解4.1 ✅ Setup4.2 ✅ Compare Models4.3 ✅ Experime…

Linux学习笔记——网络管理命令

一、网络基础知识 TCP/IP四层模型 以太网地址&#xff08;MAC地址&#xff09;&#xff1a; 段16进制数据 IP地址&#xff1a; 子网掩码&#xff1a; 二、接口管命令 ip命令&#xff1a;字符终端&#xff0c;立即生效&#xff0c;重启配置会丢失 nmcli命令&#xff1a;字符…

手撕Diffusion系列 - 第九期 - 改进为Stable Diffusion(原理介绍)

手撕Diffusion系列 - 第九期 - 改进为Stable Diffusion&#xff08;原理介绍&#xff09; 目录 手撕Diffusion系列 - 第九期 - 改进为Stable Diffusion&#xff08;原理介绍&#xff09;DDPM 原理图Stable Diffusion 原理Stable Diffusion的原理解释Stable Diffusion 和 Diffus…

JAVAweb学习日记(八) 请数据库模型MySQL

一、MySQL数据模型 二、SQL语言 三、DDL 详细见SQL学习日记内容 四、DQL-条件查询 五、DQL-分组查询 聚合函数&#xff1a; 分组查询&#xff1a; 六、DQL-分组查询 七、分页查询 八、多表设计-一对多&一对一&多对多 一对多-外键&#xff1a; 一对一&#xff1a; 多…

微信小程序1.1 微信小程序介绍

1.1 微信小程序介绍 内容提要 1.1 什么是微信小程序 1.2 微信小程序的功能 1.3 微信小程序使用场景 1.4 微信小程序能取代App吗 1.5 微信小程序的发展历程 1.6微信小程序带来的机会

音频入门(一):音频基础知识与分类的基本流程

音频信号和图像信号在做分类时的基本流程类似&#xff0c;区别就在于预处理部分存在不同&#xff1b;本文简单介绍了下音频处理的方法&#xff0c;以及利用深度学习模型分类的基本流程。 目录 一、音频信号简介 1. 什么是音频信号 2. 音频信号长什么样 二、音频的深度学习分…

Midjourney中的强变化、弱变化、局部重绘的本质区别以及其有多逆天的功能

开篇 Midjourney中有3个图片“微调”&#xff0c;它们分别为&#xff1a; 强变化&#xff1b;弱变化&#xff1b;局部重绘&#xff1b; 在Discord里分别都是用命令唤出的&#xff0c;但如今随着AI技术的发达在类似AI可人一类的纯图形化界面中&#xff0c;我们发觉这样的逆天…

【Linux】命令为桥,存在为岸,穿越虚拟世界的哲学之道

文章目录 Linux基础入门&#xff1a;探索操作系统的内核与命令一、Linux背景与发展历史1.1 Linux的起源与发展1.2 Linux与Windows的对比 二、Linux的常用命令2.1 ls命令 - "List"&#xff08;列出文件)2.2 pwd命令 - "Print Working Directory"&#xff08…

[护网杯 2018]easy_tornado1

题目 、 依次点击文件查看 /flag.txt flag in /fllllllllllllag /welcome.txt render /hints.txt md5(cookie_secretmd5(filename)) tornado模板注入 报cookie /error?msg{{handler.settings}} cookie_secret: 6647062b-e68d-4406-90d3-06e307fa955c} 使用python脚本…

STM32+W5500+以太网应用开发+003_TCP服务器添加OLED(u8g2)显示状态

STM32W5500以太网应用开发003_TCP服务器添加OLED&#xff08;u8g2&#xff09;显示状态 实验效果3-TCP服务器OLED1 拷贝显示驱动代码1.1 拷贝源代码1.2 将源代码添加到工程1.3 修改代码优化等级1.4 添加头文件路径1.5 修改STM32CubeMX工程 2 修改源代码2.1 添加头文件2.2 main函…

基于微信小程序的英语学习交流平台设计与实现(LW+源码+讲解)

专注于大学生项目实战开发,讲解,毕业答疑辅导&#xff0c;欢迎高校老师/同行前辈交流合作✌。 技术范围&#xff1a;SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容&#xff1a;…

ORB-SLAM2源码学习:Initializer.cc⑧: Initializer::CheckRT检验三角化结果

前言 ORB-SLAM2源码学习&#xff1a;Initializer.cc⑦: Initializer::Triangulate特征点对的三角化_cv::svd::compute-CSDN博客 经过上面的三角化我们成功得到了三维点&#xff0c;但是经过三角化成功的三维点并不一定是有效的&#xff0c;需要筛选才能作为初始化地图点。 …

macOS如何进入 Application Support 目录(cd: string not in pwd: Application)

错误信息 cd: string not in pwd: Application 表示在当前目录下找不到名为 Application Support 的目录。可能的原因如下&#xff1a; 拼写错误或路径错误&#xff1a;确保你输入的目录名称正确。目录名称是区分大小写的&#xff0c;因此请确保使用正确的大小写。正确的目录名…

记录一个连不上docker中的mysql的问题

引言 使用的debian12,不同发行版可能有些许差异&#xff0c;连接使用的工具是navicat lite 本来是毫无思绪的&#xff0c;以前在云服务器上可能是防火墙的问题&#xff0c;但是这个桌面环境我压根没有使用防火墙。 直到 ying192:~$ mysql -h127.0.0.1 -uroot ERROR 1045 (28…

Gradle自定义任务指南 —— 释放构建脚本的无限可能

文章目录 &#x1f50d;Gradle任务⚙️ 自定义任务的5大核心配置项1. 任务注册&#xff08;Registering Tasks&#xff09;2. group & description3. dependsOn4. inputs & outputs5. 类型化任务&#xff08;Task Types&#xff09; 任务常见配置参数传递方式1&#xf…

windows11关闭系统更新详细操作步骤

文章目录 1.打开注册表2.修改注册表内容2.1 新建文件2.2 修改值 3.修改设置 1.打开注册表 winR输入regedit(如下图所示) 2.修改注册表内容 进HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\WindowsUpdate\UX\Settings 2.1 新建文件 右侧界面右键即可 2.2 修改值 重命名为如下…

matlab绘图——彩色螺旋图

代码生成的图形是一个动态的彩色螺旋&#xff0c;展示了如何利用极坐标和颜色映射创建视觉吸引力强的图形。该图形可以用于数据可视化、艺术创作或数学演示&#xff0c;展示了 MATLAB 在图形处理方面的强大能力。通过调整 theta 和 r 的范围&#xff0c;可以创建出不同形状和复…

啥是EPS?

文章目录 1. 什么是EPS?2. 主要构成3. EPS的设计如何符合功能安全?4. 代表性的厂家1. 什么是EPS? EPS(Electric Power Steering,电动助力转向系统)是一种利用电动机提供转向助力的系统,取代了传统的液压助力转向系统(HPS)。EPS通过传感器检测驾驶员的转向意图,并由电…

QT:控件属性及常用控件(3)-----输入类控件(正则表达式)

输入类控件既可以进行显示&#xff0c;也能让用户输入一些内容&#xff01; 文章目录 1.Line Edit1.1 用户输入个人信息1.2 基于正则表达式的文本限制1.3 验证两次输入的密码是否一致1.4 让输入的密码可以被查看 2.Text Edit2.1 输入和显示同步2.1 其他信号出发情况 3.ComboBox…