【微服务】spring webflux响应式编程使用详解

目录

一、webflux介绍

1.1 什么是webflux

1.2 什么是响应式编程

1.3 webflux特点

二、Java9中响应式编程

2.1 定义事件流源

2.2 实现订阅者

三、Spring Webflux介绍

四、Reactor 介绍

五、Reactor 常用API操作

5.1 Flux 创建流操作API

5.2 Flux响应流的订阅

5.3 Flux处理实时流

六、Spring Webflux 使用

6.1 Spring Webflux简介

6.1 Spring Webflux中的核心组件

6.2 Spring Webflux基于注解的实现

6.2.1 引入核心依赖

6.2.2 核心业务类

6.2.3 核心接口类

6.3 Spring Webflux 函数式编程实现

6.3.1 自定义handler

6.3.2 自定义server服务器

6.3.3 访问效果测试

6.3.4 使用webclient调用

6.4 Spring Boot RouterFunction 整合方式一

6.5 Spring Boot RouterFunction 整合方式二

6.5.1 静态化改造

七、webflux的使用场景

八、写在文末


一、webflux介绍

1.1 什么是webflux

webflux,即响应式编程。在JDK9中开始引入了响应式编程模型,而spring5.0版本之后正式引入对webflux的支持,即spring webflux,spring webflux是spring在5.0版本后提供的一套响应式编程风格的web开发框架。

1.2 什么是响应式编程

响应式编程是一种用于处理异步数据流和事件的编程范式。它的核心思想是将数据流看作是一系列事件的序列,通过对事件流的处理来实现计算。它强调基于事件的异步处理和函数式编程的思想,可以帮助开发人员更好地处理复杂的应用程序逻辑。

而响应式编程,其实就是为这种异步非阻塞的流式编程制定的一套标准。流式编程已不陌生了,Java8提供的stream api就是这种风格。这套标准包括对运行环境(JVM、JavaScript)以及网络协议相关的规范。

1.3 webflux特点

非阻塞式

在有限资源下,提高系统吞吐量和伸缩性,以 Reactor 为基础实现响应式编程

函数式编程

Spring5 框架基于 java8,Webflux 使用 Java8 函数式编程方式实现路由请求

二、Java9中响应式编程

Java 9引入了Flow API作为响应式编程的标准实现,具体来说:

  • Flow API提供了一组接口和类,用于定义和处理数据流;

  • 它基于Publisher-Subscriber模式,其中Publisher生成事件流并发布给Subscriber进行处理。

如果使用Java9中的响应式编程进行实现,核心需要两步:

  • 定义事件流源;
  • 实现订阅者;

下面来看一段具体的实现代码。

2.1 定义事件流源

在Flow API中,事件流源被定义为Publisher的实现类,具体来说,首先需要创建一个类实现Publisher接口,并重写其subscribe()方法。在subscribe()方法中,可以通过调用Subscriber的onSubscribe()方法来将事件流订阅给Subscriber。

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;public class EventPublisher implements Flow.Publisher<String> {@Overridepublic void subscribe(Flow.Subscriber<? super String> subscriber) {subscriber.onSubscribe(new SimpleSubscription(subscriber));}
}

2.2 实现订阅者

订阅者是实现Subscriber接口的类。在Flow API中,只需要实现Subscriber接口的onNext()、onError()和onComplete()方法;

  • 当事件流发出下一个元素时,onNext()方法将被调用;

  • 当发生错误时,onError()方法将被调用;

  • 当事件流结束时,onComplete()方法将被调用;

在这些方法中,我们可以根据业务需要添加处理事件流的数据相关逻辑。

import java.util.concurrent.Flow;public class EventSubscriber implements Flow.Subscriber<String> {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {this.subscription = subscription;this.subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println("Received item: " + item);subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.err.println("Error occurred: " + throwable.getMessage());}@Overridepublic void onComplete() {System.out.println("Event stream completed.");}
}

测试代码

import java.util.concurrent.Flow;public class Main {public static void main(String[] args) {EventPublisher publisher = new EventPublisher();EventSubscriber subscriber = new EventSubscriber();publisher.subscribe(subscriber);publisher.submit("Event 1");publisher.submit("Event 2");publisher.submit("Event 3");publisher.close();}
}

三、Spring Webflux介绍

是Spring5添加新的模块,用于web开发的,功能和SpringMVC类似的,Webflux使用当前一种比较流程响应式编程出现的框架。spring官方文档地址:Web on Reactive Stack :: Spring Framework

spring-webflux是spring web框架体系中的一个组成模块,说起这个WebFlux,不难会拿出来与Spring Web与WebMvc进行比较,因为在目前很多项目开发中,仍然会使用WebMVC进行开发,尽管springboot成为基础的开发框架,但是接口开发中核心组件还是WebMVC的进一步封装。

四、Reactor 介绍

可以这么理解,响应式编程中的核心实现在于Reactor 的实现和应用,具体来说,Reactor是满足Reactive规范框架。具体来说:

  • 对响应式流规范的一种实现;

  • Spring Webflux默认的响应式框架;

  • 完全异步非阻塞,对背压的支持;

  • 提供两个异步序列API,Flux[N]和Mono[0|1];

  • 提供对响应式流的操作;

在Reactor中,有两个核心类,Flux和Mono ,这两个类实现接口 Publisher,提供丰富操作符。

  • Flux 对象实现发布者,返回 N 个元素,即产生0到N个元素的异步序列;

  • Mono 实现发布者,返回 0 或者 1 个元素,即产生至多一个元素的异步序列。

Flux 和 Mono 都是数据流的发布者,使用 Flux 和 Mono 都可以发出三种数据信号:元素值,错误信号,完成信号,错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者。

三种数据信号特点:

  • 错误信号和完成信号都是终止信号,不能共存的;
  • 如果没有发送任何元素值,而是直接发送错误或者完成信号,表示是空数据流;
  • 如果没有错误信号,没有完成信号,表示是无限数据流;

五、Reactor 常用API操作

接下来通过实际操作来演示下基于Reactor 常用的API的使用。引入如下依赖包。

<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.5.5</version>
</dependency>

5.1 Flux 创建流操作API

在上面提到,如果你需要创建多于一个元素的异步序列,可以考虑使用Flux 相关API,下面是使用Flux 的创建多种形式流的操作

import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;public class ReactorApi {@Testpublic void fluxJust() {Flux<String> phones = Flux.just("小米", "三星", "华为");}@Testpublic void fluxFromIterable() {Flux<String> phones = Flux.fromIterable(Arrays.asList("小米", "三星", "华为"));}@Testpublic void fluxFromArray() {Flux<String> phones = Flux.fromArray(new String[]{"小米", "三星", "华为"});}@Testpublic void fluxFromStream() {Flux<String> phones = Flux.fromStream(Stream.of(new String[]{"小米", "三星", "华为"}));phones.subscribe();phones.subscribe(); //只能被订阅一次}@Testpublic void fluxEmpty() {Flux<String> phones = Flux.empty(); //generic type still honored}@Testpublic void fluxRange() {Flux<Integer> phones = Flux.range(5, 3);}@Testpublic void fluxGenerate() {Flux<Long> flux = Flux.generate(AtomicLong::new,(state, sink) -> {long i = state.getAndIncrement();sink.next(i);if (i == 10) sink.complete();return state;},(state) -> System.out.println("done"));flux.subscribe(System.out::println);}@Testpublic void fluxCreate() {Flux<String> phones = Flux.create((t) -> {t.next("小米");t.next("三星");t.next("华为");t.complete();});phones.subscribe(System.out::println);System.out.println("------------");Flux<String> ownFluxListener = Flux.create(sink -> {//传入自定义的方法new MyDataListener(){public void onReceiveData(String str){sink.next(str);}public void onComplete(){sink.complete();}};}, FluxSink.OverflowStrategy.DROP);ownFluxListener.subscribe(System.out::println);}public class MyDataListener{public void onReceiveData(String str){System.out.println("收到数据:"+str);}public void onComplete(){System.out.println("完成数据的消费处理");}}@Testpublic void fluxDefer() {Flux.defer(() -> Flux.just("小米", "三星", "华为")).subscribe(System.out::println);Flux<String> stockSeq4 = Flux.defer(() -> Flux.fromStream(Stream.of(new String[]{"小米", "三星", "华为"})));stockSeq4.subscribe();stockSeq4.subscribe();}@Testpublic void fluxInterval() throws InterruptedException {//interval 定时发送元素Flux.interval(Duration.of(1, ChronoUnit.SECONDS)).subscribe((t) -> System.out.println((String.valueOf(t))));Thread.sleep(1000000);}}

5.2 Flux响应流的订阅

在上面的操作API中,调用 just 或者其他方法只是声明数据流,数据流并没有发出,只有进行订阅之后才会触发数据流,不订阅什么都不会发生的。接下来,看看如何订阅和操作这些流。

import org.junit.Test;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.util.List;public class ReactorStreamApi {/*** 流的map操作*/@Testpublic void streamMap() {Flux<Integer> ints = Flux.range(1, 4);Flux<Integer> mapped = ints.map(i -> i * 2);mapped.subscribe(System.out::println);}/*** 带有异常情况的处理*/@Testpublic void withError() {Flux<Integer> ints = Flux.range(1, 4).map(i-> {if(i<3){return i;}throw new RuntimeException("大于3");});ints.subscribe(i-> System.out.println(i),err -> System.out.println("error : " + err.getMessage()),() -> System.out.println("完成订阅和数据的消费"));}@Testpublic void testSubscribeWithBase(){Flux<Integer> ints = Flux.range(1, 4);ints.subscribe(new MySubscriber<>());}public class MySubscriber<T> extends BaseSubscriber<T> {@Overrideprotected void hookOnSubscribe(Subscription subscription) {System.out.println("MySubscriber");request(1);}@Overrideprotected void hookOnNext(T value) {System.out.println(value.toString());request(1);}}/*** 流的filter操作*/@Testpublic void streamFilter() {Flux<Integer> ints = Flux.range(1, 4);Flux<Integer> filtered = ints.filter(i -> i % 2 == 0);filtered.subscribe(System.out::println);}@Testpublic void streamBuffer() {Flux<Integer> ints = Flux.range(1, 40);Flux<List<Integer>> buffered = ints.buffer(3);buffered.subscribe(System.out::println);}@Testpublic void streamRetry() {Mono<String> client = Mono.fromSupplier(() -> {double num = Math.random();if (num > 0.01) {throw new Error("Network issue");}return "https://www.baidu.com";});client.retry(3).subscribe(System.out::println);}/*** 响应式流的合并*/@Testpublic void streamZip(){Flux<Integer> fluxA = Flux.range(1, 4);Flux<Integer> fluxB = Flux.range(5, 5);fluxA.zipWith(fluxB, (a, b)-> a+b).subscribe(System.out::println);}}

5.3 Flux处理实时流

对于某些需要实时处理的场景,可以考虑Flux的实时流的处理

import org.junit.Test;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;public class StreamTest {@Testpublic void simpleHotStreamCreation() {Sinks.Many<Integer> hotSource = Sinks.unsafe().many().multicast().directBestEffort();//转为fluxFlux<Integer> hotFlux = hotSource.asFlux();//订阅数据hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: " + d));hotSource.emitNext(1, FAIL_FAST);hotSource.tryEmitNext(2).orThrow();hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: " + d));hotSource.emitNext(3, FAIL_FAST);hotSource.emitNext(4, FAIL_FAST);hotSource.emitComplete(FAIL_FAST);}@Testpublic void connectableFlux() throws InterruptedException {Flux<Integer> source = Flux.range(1, 4);ConnectableFlux<Integer> connectableFlux = source.publish();connectableFlux.subscribe(d -> System.out.println("Subscriber 1 gets " + d));connectableFlux.subscribe(d -> System.out.println("Subscriber 2 gets " + d));System.out.println("Finish subscribe action");Thread.sleep(1000L);System.out.println("Connect to Flux now");connectableFlux.connect();}@Testpublic void autoConnectConnectableFlux() throws InterruptedException {Flux<Integer> source = Flux.range(1, 4);Flux<Integer> autoConnect = source.publish().autoConnect(2);autoConnect.subscribe(d -> System.out.println("Subscriber 1 gets " + d));System.out.println("Finish subscriber 1 action");Thread.sleep(1000L);System.out.println("Start subscriber 2 action");autoConnect.subscribe(d -> System.out.println("Subscriber 2 gets " + d));}}

六、Spring Webflux 使用

6.1 Spring Webflux简介

在servlet3.0标准之前,是每一个请求对应一个线程。如果此时一个线程出现了高延迟,就会产生阻塞问题,从而导致整个服务出现严重的性能情况。因为一旦要调用第三方接口,就有可能出现这样的操作了。早期的处理方式只能是手工控制线程。

在servlet3.0标准之后,为解决此类问题,提供了异步响应的支持。在异步响应处理结构中,可以将耗时操作的部分交由一个专属的异步线程进行响应处理,同时请求的线程资源将被释放,并将该线程返回到线程池中,以供其他请求使用,这样的操作机制将极大的提升程序的并发性能。

对于以上给出的响应式编程支持,仅仅是一些原生的支持模式,而现在既然基于springboot程序开发,那么就需要考虑一些更简单的整合。

在spring中实现响应式编程,就需要使用到spring webFlux。该组件是一个重新构建的且基于Reactive Streams标准实现的异步非阻塞Web开发框架,以Reactor开发框架为基础,可以更加容易实现高并发访问下的请求处理模型。在springboot2.x版本中提供了webFlux依赖模块,该模块有两种模型实现:一种是基于功能性端点的方式(编程式实现),另一种是基于SpringMVC注解方式。

6.1 Spring Webflux中的核心组件

Spring Webflux 基于 Reactor,默认使用容器是 Netty,Netty 是高性能的 NIO 框架,异步非阻 塞的框架。Spring Webflux 执行过程和 SpringMVC 相似的 Spring Webflux 核心控制器 DispatchHandler,实现接口 WebHandler。

SpringWebflux 里面 DispatcherHandler,负责请求的处理,

  • HandlerMapping:请求查询到处理的方法

  • HandlerAdapter:真正负责请求处理

  • HandlerResultHandler:响应结果处理

SpringWebflux 实现函数式编程,两个接口:RouterFunction(路由处理)和 HandlerFunction(处理函数)

6.2 Spring Webflux基于注解的实现

6.2.1 引入核心依赖

注意,如果是在springboot项目中提供web接口,引入了下面的依赖之后就不要引入spring-boot-starter-web依赖了。

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency>

6.2.2 核心业务类

使用webflux编写web接口,与普通的rest-api类似,只是在webflux,返回值不再是对象或其他数据类型,而是Flux或Mono包装的数据对象。

import com.congge.entity.BookInfo;
import com.congge.service.BookService;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.util.HashMap;
import java.util.Map;@Service
public class BookServiceImpl implements BookService {//创建 map 集合存储数据private final Map<String,BookInfo> books = new HashMap<>();public BookServiceImpl() {this.books.put("01",new BookInfo("01","Java",20));this.books.put("02",new BookInfo("02","Js",30));this.books.put("03",new BookInfo("03","Hadoop",50));}@Overridepublic Mono<BookInfo> getById(String id) {return Mono.justOrEmpty(this.books.get(id));}@Overridepublic Flux<BookInfo> getAll() {return Flux.fromIterable(this.books.values());}@Overridepublic Mono<Void> saveBookInfo(Mono<BookInfo> bookInfoMono) {return bookInfoMono.doOnNext(book -> {//向 map 集合里面放值int id = books.size()+1;books.put(String.valueOf(id),book);}).thenEmpty(Mono.empty());}
}

6.2.3 核心接口类

import com.congge.entity.BookInfo;
import com.congge.service.BookService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;@RestController
public class BookController {@Autowiredprivate BookService bookService;//根据ID查询 http://localhost:8082/book/01@GetMapping("/book/{id}")public Mono<BookInfo> getById(@PathVariable String id) {return bookService.getById(id);}//查询所有 http://localhost:8082/findAll@GetMapping("/findAll")public Flux<BookInfo> getUsers() {return bookService.getAll();}@PostMapping("/save")public Mono<Void> save(@RequestBody BookInfo user) {Mono<BookInfo> userMono = Mono.just(user);return bookService.saveBookInfo(userMono);}}

选择其中一个接口测试,可以看到效果与传统的API接口返回值并无差别

补充说明:

1)SpringMVC 方式实现,同步阻塞的方式,基于 SpringMVC+Servlet+Tomcat;

2)SpringWebflux 方式实现,异步非阻塞 方式,基于 SpringWebflux+Reactor+Netty;

6.3 Spring Webflux 函数式编程实现

在使用函数式编程模型操作时候,需要自己初始化服务器,基于函数式编程模型时候,有两个核心接口:RouterFunction(实现路由功能,请求转发给对应的 handler)和 HandlerFunction(处理请求生成响应的函数)。核心任务定义两个函数式接口的实现并且启动需要的服务器。

Spring Webflux 请 求 和 响 应 不 再 是 ServletRequest 和 ServletResponse ,而是ServerRequest 和 ServerResponse

熟悉Netty的同学对Netty的编码风格不陌生,在编写Netty的服务时,也需要自定义Handler,然后将这个自定义Handler配置到启动配置参数中,因此可以同样的方式来理解Spring Webflux的函数式编程的套路。

6.3.1 自定义handler

可以这么理解,在这个handler类中,其实就是对底层的业务方法进一步的封装,只不过返回的数据类型为Mono或Flux;

import com.congge.entity.BookInfo;
import com.congge.service.BookService;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;public class BookHandler {private BookService bookService;public BookHandler(BookService bookService) {this.bookService = bookService;}/*** 根据ID查询* @param serverRequest* @return*/public Mono<ServerResponse> getBookById(ServerRequest serverRequest) {String id = serverRequest.pathVariable("id");Mono<BookInfo> bookInfoMono = this.bookService.getById(id);Mono<ServerResponse> noDataRes = ServerResponse.notFound().build();return bookInfoMono.flatMap(bookInfo ->ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(bookInfo, BookInfo.class).switchIfEmpty(noDataRes));}/*** 获取所有* @return*/public Mono<ServerResponse> getAllBooks(ServerRequest serverRequest) {//调用 service 得到结果Flux<BookInfo> bookInfoFlux = this.bookService.getAll();returnServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(bookInfoFlux,BookInfo.class);}/*** 保存数据* @param request* @return*/public Mono<ServerResponse> saveUser(ServerRequest request) {//得到 user 对象Mono<BookInfo> bookInfoMono = request.bodyToMono(BookInfo.class);returnServerResponse.ok().build(this.bookService.saveBookInfo(bookInfoMono));}

6.3.2 自定义server服务器

该类的作用就相当于是netty编程中,通过ServerBootstrap创建一个服务器类似;

import com.congge.handler.BookHandler;
import com.congge.service.BookService;
import com.congge.service.impl.BookServiceImpl;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.netty.http.server.HttpServer;import static org.springframework.web.reactive.function.server.RouterFunctions.toHttpHandler;public class BookServer {public RouterFunction routerFunction() {BookService bookService = new BookServiceImpl();BookHandler bookHandler = new BookHandler(bookService);//设置路由/*  return RouterFunctions.route(GET("/users/{id}").and(accept(APPLICATION_JSON)),handler::getUserById).andRoute(GET("/users").and(accept(APPLICATION_JSON)),handler::getAllUsers);*/return RouterFunctions.route(RequestPredicates.GET("/users/{id}").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),bookHandler::getBookById).andRoute(RequestPredicates.GET("/users/{id}").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),bookHandler::getAllBooks);}public void createReactorServer() {//路由和 handler 适配RouterFunction<ServerResponse> route = routerFunction();HttpHandler httpHandler = toHttpHandler(route);ReactorHttpHandlerAdapter adapter = newReactorHttpHandlerAdapter(httpHandler);//创建服务器HttpServer httpServer = HttpServer.create();httpServer.handle(adapter).bindNow();}public static void main(String[] args) throws Exception{BookServer server = new BookServer();server.createReactorServer();System.out.println("enter to exit");System.in.read();}}

在该类的最后,编写了一个main函数,运行这个main程序,注意日志中的端口号,因为接下来将通过这个端口进行访问;

6.3.3 访问效果测试

访问接口:localhost:51315/book/01

6.3.4 使用webclient调用

也可以编写webclient调用上面的接口,代码如下

import com.congge.entity.BookInfo;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;public class ClientTest {public static void main(String[] args) {//调用服务器地址WebClient webClient = WebClient.create("http://127.0.0.1:51315");//根据 id 查询String id = "01";BookInfo bookInfo = webClient.get().uri("/book/{id}", id).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(BookInfo.class).block();System.out.println(bookInfo.getName());//查询所有Flux<BookInfo> results = webClient.get().uri("/book/findAll").accept(MediaType.APPLICATION_JSON).retrieve().bodyToFlux(BookInfo.class);results.map(stu -> stu.getName()).buffer().doOnNext(System.out::println).blockFirst();}}

6.4 Spring Boot RouterFunction 整合方式一

上面是通过自定义handler的方式实现了Spring Webflux函数式编程,如果直接在springboot中直接集成怎么做呢,只需要通过自定义配置bean的方式,将路由配置进去即可;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;import static org.springframework.web.reactive.function.BodyInserters.fromObject;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;@Configuration
public class MyRoutesConfig {@BeanRouterFunction<ServerResponse> index() {return route(GET("/index"), request -> ok().body(fromObject("Hello Index")));}@BeanRouterFunction<ServerResponse> about() {return route(GET("/about"), request -> ok().body(fromObject("About page")));}}

当然里面的逻辑非常简单,实际使用时,可以在每个bean中补充更复杂的逻辑,比如调用其他业务类的逻辑,同样我们启动springboot应用后访问下端点/index,看到下面的效果。

6.5 Spring Boot RouterFunction 整合方式二

紧接着上面的案例,下面使用更通用的做法来完成与RouterFunction 的整合,首先还是自定义一个handler,这种自定义的配置类形式handler好处是可以注入其他业务类,从而实现更复杂的逻辑。

import com.congge.entity.BookInfo;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;@Component
public class ApiHandler {public Mono<ServerResponse> getNewBooks(ServerRequest serverRequest) {return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(Flux.create(sink ->{sink.next(new BookInfo("05","mysql",90));sink.next(new BookInfo("06","flink",78));sink.next(new BookInfo("07","php",66));sink.complete();}),BookInfo.class);}public Mono<ServerResponse> getBookById(ServerRequest serverRequest) {String bookId = serverRequest.pathVariable("id");return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(Mono.just(new BookInfo(bookId,"python",57)),BookInfo.class);}}

自定义routerFunction,可以这么理解,通过这个类,就不用再单独编写一个controller,从而实现与普通的controller类中一样定义接口的功能。

import com.congge.handler.ApiHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;@Configuration
public class ApiRouterFunction {@Beanpublic RouterFunction<ServerResponse> apiRoute(ApiHandler apiHandler){return route(GET("/book/getBookById/{id}"),apiHandler::getBookById).and(route(GET("/book/getNewBooks"),apiHandler::getNewBooks));}}

启动项目之后,我们来访问一下其中的一个接口,效果与普通的接口效果类似。

6.5.1 静态化改造

如果你不希望上面的自定义handler和routerConfig与spring框架耦合的太紧密,也可以将其做成静态化的配置,通过app启动的时候自动注册,只需去掉spring相关的注解,然后在app启动类注册进去即可。

ApiRouterFunction改造,将原本的配置bean方法修改为static 如下代码

public class ApiRouterFunction {public static RouterFunction<ServerResponse> apiRoute(){ApiHandler apiHandler = new ApiHandler();return route(GET("/book/getBookById/{id}"),apiHandler::getBookById).and(route(GET("/book/getNewBooks"),apiHandler::getNewBooks));}}

ApiHandler改造,去掉配置注解,启动类改造如下

public static void main(String[] args) {new SpringApplicationBuilder().sources(FluxApp.class).initializers((ApplicationContextInitializer<GenericApplicationContext>) ctx ->{ctx.registerBean("apiRoute",RouterFunction.class,ApiRouterFunction::apiRoute);}).run(args);}

再次启动后调用相同的接口,仍然可以得到正确的响应结果

七、webflux的使用场景

通过上面关于webflux的使用了解到webflux的强大之处,其实在很多中间件,微服务组件中都随处可见webflux的响应式编程的影子,比如在springcloud gateway网关中,网关作为流量的入口,为了持续提升整体服务的高性能、高吞吐、高并发的请求,在处理请求拦截、路由转发等方面使用webflux。如下这段代码,就是gateway中自定义过滤器的一段配置;

@Component
@Slf4j
public class LogFilter implements GlobalFilter {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {log.info(exchange.getRequest().getPath().value());return chain.filter(exchange);}
}

结合实际经验,对于下面的这些场景,可以考虑使用webflux解决:

  • Spring WebFlux 是一个异步非阻塞式的 Web 框架,所以,它特别适合应用在 IO 密集型的服务中,比如像上面提到的微服务网关这样的应用中;
  • 硬件资源扩充困难,但又希望提升系统整体的吞吐量,可以考虑使用webflux,因为WebFlux 内部使用的是响应式编程(Reactive Programming),以 Reactor 库为基础, 基于异步和事件驱动;
  • 一些对请求响应时间要求不高,但是并发较大的异步场景;

注意

WebFlux 并不能使接口的响应时间缩短,它仅仅能够提升吞吐量和伸缩性。

八、写在文末

从WebFlux 的发展以及在众多的Java生态组件中广泛使用来看,WebFlux 的流行趋势已经到来,因此掌握WebFlux 的核心原理和思想,在日常工作开发中,在某些特殊的场景下能够提供很好的解决思路,当然WebFlux 涉及到的技术点还有很多,比如对websocket的支持等,有兴趣的同学可以继续参阅相关资料深入学习,本篇到此结束,感谢观看。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/110620.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mybatis对数据库进行增删查改以及单元测试

这篇写的草率了&#xff0c;是好几天前学到&#xff0c;以后用来自己复习 UserInfo import lombok.Data;Data public class UserInfo {private int id;private String name;private int age;private String email;//LocalDateTime可用于接收 时间}Mapper UserMapper pack…

软考 系统架构设计师系列知识点之软件构件(1)

所属章节&#xff1a; 第2章. 计算机系统基础知识 第3节. 计算机软件 2.3.7 软件构件 1. 概述 构件又称为组件&#xff0c;是一个自包容、可复用的程序集。构建是一个程序集、或者说是一组程序的集合。这个集合可能会以各种方式体现出来&#xff0c;如源程序或二进制代码。这…

2023年中国多功能折叠刀产量、销量及市场规模分析[图]

多功能折叠刀是一种集多种功能于一身的刀具&#xff0c;通常包括切割、开瓶、剥皮、锯木等功能&#xff0c;可以通过折叠和展开的方式来实现不同的功能&#xff0c;具有便携、多用途、安全等特点&#xff0c;广泛应用于户外探险、露营、自驾旅行等场景。 多功能折叠刀行业分类…

Simian使用方法

1.下载 链接1&#xff1a;官网下载 链接2&#xff1a;压缩包 2.操作 1.双击exe启动 2.打开控制台&#xff0c;winR 输入cmd 3.输入操作语句 G:\1111\simian-2.5.10\bin\simian-2.5.10.exe -includes"G:\1111\test\*.cpp" -threshold3 > output.txt G:\1111\si…

利用TypeScript 和 jsdom 库实现自动化抓取数据

以下是一个使用 TypeScript 和 jsdom 库的下载器程序&#xff0c;用于下载zhihu的内容。此程序使用了 duoip.cn/get_proxy 这段代码。 import { JSDOM } from jsdom; import { getProxy } from https://www.duoip.cn/get_proxy;const zhihuUrl https://www.zhihu.com;(async (…

璞华科技再次赋能,助力成都市温江区“码”上维权不烦“薪” !

科技赋能护“薪”行动 “码”上维权不烦“薪” 为保障劳动者工资收入的合法权益&#xff0c;提升人社部门智能化咨询服务能力&#xff0c;2023年10月17日&#xff0c;成都市温江区人力资源和社会保障局发布“码上护薪”小程序&#xff0c;助力劳动者“码”上维权不烦”薪”。…

【Machine Learning】01-Supervised learning

01-Supervised learning 1. 机器学习入门1.1 What is Machine Learning?1.2 Supervised learning1.3 Unsupervised learning 2. Supervised learning2.1 单元线性回归模型2.1.1 Linear Regression Model&#xff08;线性回归模型&#xff09;2.1.2 Cost Function&#xff08;代…

学习编程语言需要熟悉库函数吗?

学习编程语言需要熟悉库函数吗? 我想答案肯定是需要的。 但不是盲目的挨个去记&#xff0c;几乎各个语言的库函数都极为丰富&#xff0c;逐个记忆的话是十分劝退的&#xff0c;而且也不可能全部熟悉&#xff0c;到用的时候该忘还是忘。最近很多小伙伴找我&#xff0c;说想要一…

AArch64 TrustZone

概述 本文我们介绍了 TrustZone 技术。通过CPU内置的硬件强制隔离&#xff0c;TrustZone 提供了一种高效的全系统安全设计。 我们介绍了如下功能&#xff1a;将 TrustZone 技术添加到处理器架构中&#xff0c;内存系统对于 TrustZone 的支持以及典型的软件架构。我们还介绍了…

利用MixProxy自动录制生成Pytest案例:轻松实现测试脚本编写!

前言 进行接口自动化时&#xff0c;有时候往往没有接口文档&#xff0c;或者文档更新并不及时&#xff0c;此时&#xff0c;想要获取相关接口&#xff0c;通过抓包是一种快速便捷的手段。抓包获取到接口后&#xff0c;开始写接口用例&#xff0c;此时需要复制请求url、请求参数…

React拖拽实践

当涉及到前端开发中的用户体验时&#xff0c;拖拽功能是一个常见而重要的需求。在React中&#xff0c;实现拖拽功能可以通过多种方式完成&#xff0c;但通常需要深刻理解React的状态管理、事件处理和DOM操作。本文将探讨React中拖拽的实践&#xff0c;包括基本原理、拖拽库的使…

Flutter 填坑录 (不定时更新)

一&#xff0c;内存爆表 > 图片缓存 /// State基类 class BaseState<T extends StatefulWidget> extends State<T>withAutomaticKeepAliveClientMixin,WidgetHelper,DialogHelper,EventListener {mustCallSupervoid initState() {if (isListenEvent()) {EventMa…

Python中Set()学习

二、set python 的 set 和其他语言类似, 是一个无序不重复元素集, 基本功能包括关系测试和消除重复元素。 set 和 dict 类似,但是 set 不存储 value 值的。 1、set 的创建 创建一个 set,需要提供一个 list 作为输入集合 set1=set([123,456,789]) print(set1)输出结果: …

全开源无加密跨境电商购物网站系统源码(无货源模式+多语言+多货币)

在全球化的时代背景下&#xff0c;跨境电商成为了越来越受欢迎的消费方式&#xff0c;而建立一个源码无加密多语言跨境购物网站系统是一个具有挑战性的任务&#xff0c;但完全可行。以下是这个过程的一些主要步骤&#xff1a; 1. 确定需求和功能规划&#xff1a;先确定网站需要…

IOday7

A进程 #include <head.h> int main(int argc, const char *argv[]) {pid_t cpidfork();if(cpid>0)//父进程向管道文件2写{ int wfd;if((wfdopen("./myfifo2",O_WRONLY))-1){ERR_MSG("open");return -1;} char buf[128]"";while(1){bze…

Python接口自动化 —— token登录(详解)

简介 为了验证用户登录情况以及减轻服务器的压力&#xff0c;减少频繁的查询数据库&#xff0c;使服务器更加健壮。有些登录不是用 cookie 来验证的&#xff0c;是用 token 参数来判断是否登录。token 传参有两种一种是放在请求头里&#xff0c;本质上是跟 cookie 是一样的&am…

MySql 数据库基础概念,基本简单操作及数据类型介绍

文章目录 数据库基础为什么需要数据库&#xff1f;创建数据库mysql架构SQL语句分类编码集修改数据库属性数据库备份 表的基本操作存在时更新&#xff0c;不存在时插入 数据类型日期类型enum和set 数据库基础 以特定的格式保存文件&#xff0c;叫做数据库&#xff0c;这是狭义上…

逐字稿 | 8 视频理解论文串讲(上)【论文精读】

目录 1 自从 Alexnet 之后&#xff0c;对视频理解的研究就从这种手工特征慢慢转移到卷积神经网络了。 ​编辑 1.1Deep video——深度学习时代&#xff0c;使用卷积神经网络去处理视频理解问题的最早期的工作之一 1.2如何把卷积神经网络&#xff0c;从图片识别应用到视频识别…

Nginx详细配置指南

nginx.conf配置 找到Nginx的安装目录下的nginx.conf文件&#xff0c;该文件负责Nginx的基础功能配置。 配置文件概述 Nginx的主配置文件(conf/nginx.conf)按以下结构组织&#xff1a; 配置块功能描述全局块与Nginx运行相关的全局设置events块与网络连接有关的设置http块代理…

Pyecharts绘图教程(1)—— Pyecharts可视化神器基础入门

文章目录 &#x1f3af; 1. 简介1.1 Pyecharts 是什么1.2 Pyecharts 特性 &#x1f3af; 2. 安装2.1 Pyecharts 版本2.2 常用安装方式2.3 安装地图文件&#xff08;可选&#xff09; &#x1f3af; 3. 图表类型3.1 直角坐标系图表3.2 基本图表3.3 树形图表3.4 地理图表3.5 3D图…