一、使用WebFlux入门
- WebFlux整合Mysql
- WebFlux整合ES
- WebFlus整合Mongdb
- WebFlus整合Redis
1、添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId><version>2.2.1.RELEASE</version>
</dependency>
<!-- 自动化配置响应式的 ES -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<!-- 自动化配置响应式的 Mongodb -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<!-- 自动化配置响应式的 Spring Data Jedis -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
<!-- 自动化配置响应式的 Spring Data R2DBC -->
<dependency><groupId>org.springframework.boot.experimental</groupId><artifactId>spring-boot-starter-data-r2dbc</artifactId><version>0.1.0.M2</version>
</dependency>
<!-- jasync 的 r2dbc-mysql 驱动 -->
<dependency><groupId>com.github.jasync-sql</groupId><artifactId>jasync-r2dbc-mysql</artifactId><version>1.0.11</version>
</dependency>
2、添加配置类
spring:data:elasticsearch: # Elasticsearch 配置项client:# 对应 ReactiveRestClientProperties 配置类reactive:endpoints: 127.0.0.1:9200 # ES Restful API 地址mongodb:authentication-database: admindatabase: testhost: 127.0.0.1password: m123port: 27017username: madmin
配置类
- @EnableReactiveElasticsearchRepositories
- @EnableMongoRepositories
- @EnableTransactionManagement
@Configuration
@EnableReactiveElasticsearchRepositories
@EnableMongoRepositories
@EnableTransactionManagement
public class WebFluxConfiguration {//Redis的传输对象@Beanpublic ReactiveRedisTemplate<String, Object> commonRedisTemplate(ReactiveRedisConnectionFactory factory) {RedisSerializationContext<String, Object> serializationContext =RedisSerializationContext.<String, Object>newSerializationContext(RedisSerializer.string()).value(RedisSerializer.json()) // 创建通用的 GenericJackson2JsonRedisSerializer 作为序列化.build();return new ReactiveRedisTemplate<>(factory, serializationContext);}@Beanpublic ReactiveRedisTemplate<String, UserCacheObject> userRedisTemplate(ReactiveRedisConnectionFactory factory) {RedisSerializationContext<String, UserCacheObject> serializationContext =RedisSerializationContext.<String, UserCacheObject>newSerializationContext(RedisSerializer.string()).value(new Jackson2JsonRedisSerializer<>(UserCacheObject.class)) // 创建专属 UserCacheObject 的 Jackson2JsonRedisSerializer 作为序列化.build();return new ReactiveRedisTemplate<>(factory, serializationContext);}//数据库操作mysql数据库@Beanpublic ConnectionFactory connectionFactory(R2dbcProperties properties) throws URISyntaxException {// 从 R2dbcProperties 中,解析出 host、port、databaseURI uri = new URI(properties.getUrl());String host = uri.getHost();int port = uri.getPort();String database = uri.getPath().substring(1); // 去掉首位的 / 斜杠// 创建 jasync Configuration 配置配置对象com.github.jasync.sql.db.Configuration configuration = new com.github.jasync.sql.db.Configuration(properties.getUsername(), host, port, properties.getPassword(), database);// 创建 JasyncConnectionFactory 对象return new JasyncConnectionFactory(new MySQLConnectionFactory(configuration));}//实物管理器@Beanpublic ReactiveTransactionManager transactionManager(R2dbcProperties properties) throws URISyntaxException {return new R2dbcTransactionManager(this.connectionFactory(properties));}}
4、Controller示例
@RestController
@RequestMapping("/users")
public class UserController {private static final UserDO USER_NULL = new UserDO();@Autowiredprivate UserRepository userRepository;@GetMapping("/list")public Flux<UserVO> list() {// 返回列表return userRepository.findAll().map(userDO -> new UserVO().setId(userDO.getId()).setUsername(userDO.getUsername()));}@GetMapping("/get")public Mono<UserVO> get(@RequestParam("id") Integer id) {// 返回return userRepository.findById(id).map(userDO -> new UserVO().setId(userDO.getId()).setUsername(userDO.getUsername()));}@PostMapping("add")public Mono<Integer> add(UserAddDTO addDTO) {// 查询用户Mono<UserDO> user = userRepository.findByUsername(addDTO.getUsername());// 执行插入return user.defaultIfEmpty(USER_NULL) // 设置 USER_NULL 作为 null 的情况,否则 flatMap 不会往下走.flatMap(new Function<UserDO, Mono<Integer>>() {@Overridepublic Mono<Integer> apply(UserDO userDO) {if (userDO != USER_NULL) {// 返回 -1 表示插入失败。// 实际上,一般是抛出 ServiceException 异常。因为这个示例项目里暂时没做全局异常的定义,所以暂时返回 -1 啦return Mono.just(-1);}// 将 addDTO 转成 UserDOuserDO = new UserDO().setId((int) (System.currentTimeMillis() / 1000)) // 使用当前时间戳的描述,作为 ID 。.setUsername(addDTO.getUsername()).setPassword(addDTO.getPassword()).setCreateTime(new Date());// 插入数据库return userRepository.insert(userDO).map(UserDO::getId);}});}}
4、Controller
@RestController
@RequestMapping("/users")
public class UserController {// ========== 使用通用的 ReactiveRedisTemplate 的方式 ==========@Autowiredprivate ReactiveRedisTemplate<String, Object> commonRedisTemplate;@Autowiredprivate ReactiveRedisTemplate<String, UserCacheObject> userRedisTemplate;@GetMapping("/get")public Mono<UserCacheObject> get(@RequestParam("id") Integer id) {String key = genKey(id);return commonRedisTemplate.opsForValue().get(key).map(o -> (UserCacheObject) o);}@PostMapping("/set")public Mono<Boolean> set(UserCacheObject user) {String key = genKey(user.getId());return commonRedisTemplate.opsForValue().set(key, user);}private static String genKey(Integer id) {return "user::" + id;}@GetMapping("/v2/get")public Mono<UserCacheObject> getV2(@RequestParam("id") Integer id) {String key = genKeyV2(id);return userRedisTemplate.opsForValue().get(key);}@PostMapping("/v2/set")public Mono<Boolean> setV2(UserCacheObject user) {String key = genKeyV2(user.getId());return userRedisTemplate.opsForValue().set(key, user);}//获取keyprivate static String genKeyV2(Integer id) {return "user::v2::" + id;}
}
6、Controller
@RestController
@RequestMapping("/users")
public class UserController {private static final UserDO USER_NULL = new UserDO();@Autowiredprivate UserRepository userRepository;/*** 查询用户列表*/@GetMapping("/list")public Flux<UserVO> list() {// 返回列表return userRepository.findAll().map(userDO -> new UserVO().setId(userDO.getId()).setUsername(userDO.getUsername()));}/*** 获得指定用户编号的用户*/@GetMapping("/get")public Mono<UserVO> get(@RequestParam("id") Integer id) {// 返回return userRepository.findById(id).map(userDO -> new UserVO().setId(userDO.getId()).setUsername(userDO.getUsername()));}/*** 添加用户*/@PostMapping("add")@Transactionalpublic Mono<Integer> add(UserAddDTO addDTO) {// 查询用户Mono<UserDO> user = userRepository.findByUsername(addDTO.getUsername());// 执行插入return user.defaultIfEmpty(USER_NULL) // 设置 USER_NULL 作为 null 的情况,否则 flatMap 不会往下走.flatMap(new Function<UserDO, Mono<Integer>>() {@Overridepublic Mono<Integer> apply(UserDO userDO) {if (userDO != USER_NULL) {// 返回 -1 表示插入失败。// 实际上,一般是抛出 ServiceException 异常。因为这个示例项目里暂时没做全局异常的定义,所以暂时返回 -1 啦return Mono.just(-1);}// 将 addDTO 转成 UserDOuserDO = new UserDO().setUsername(addDTO.getUsername()).setPassword(addDTO.getPassword()).setCreateTime(new Date());// 插入数据库return userRepository.save(userDO).flatMap(new Function<UserDO, Mono<Integer>>() {@Overridepublic Mono<Integer> apply(UserDO userDO) {// 如果编号为偶数,抛出异常。if (userDO.getId() % 2 == 0) {throw new RuntimeException("我就是故意抛出一个异常,测试下事务回滚");}// 返回编号return Mono.just(userDO.getId());}});}});}/*** 更新指定用户编号的用户*/@PostMapping("/update")public Mono<Boolean> update(UserUpdateDTO updateDTO) {// 查询用户Mono<UserDO> user = userRepository.findById(updateDTO.getId());// 执行更新return user.defaultIfEmpty(USER_NULL) // 设置 USER_NULL 作为 null 的情况,否则 flatMap 不会往下走.flatMap(new Function<UserDO, Mono<Boolean>>() {@Overridepublic Mono<Boolean> apply(UserDO userDO) {// 如果不存在该用户,则直接返回 false 失败if (userDO == USER_NULL) {return Mono.just(false);}// 查询用户是否存在return userRepository.findByUsername(updateDTO.getUsername()).defaultIfEmpty(USER_NULL) // 设置 USER_NULL 作为 null 的情况,否则 flatMap 不会往下走.flatMap(new Function<UserDO, Mono<? extends Boolean>>() {@Overridepublic Mono<? extends Boolean> apply(UserDO usernameUserDO) {// 如果用户名已经使用(该用户名对应的 id 不是自己,说明就已经被使用了)if (usernameUserDO != USER_NULL && !Objects.equals(updateDTO.getId(), usernameUserDO.getId())) {return Mono.just(false);}// 执行更新userDO.setUsername(updateDTO.getUsername());userDO.setPassword(updateDTO.getPassword());return userRepository.save(userDO).map(userDO -> true); // 返回 true 成功}});}});}/*** 删除指定用户编号的用户*/@PostMapping("/delete") // URL 修改成 /delete ,RequestMethod 改成 DELETEpublic Mono<Boolean> delete(@RequestParam("id") Integer id) {// 查询用户Mono<UserDO> user = userRepository.findById(id);// 执行删除。这里仅仅是示例,项目中不要物理删除,而是标记删除return user.defaultIfEmpty(USER_NULL) // 设置 USER_NULL 作为 null 的情况,否则 flatMap 不会往下走.flatMap(new Function<UserDO, Mono<Boolean>>() {@Overridepublic Mono<Boolean> apply(UserDO userDO) {// 如果不存在该用户,则直接返回 false 失败if (userDO == USER_NULL) {return Mono.just(false);}// 执行删除return userRepository.deleteById(id).map(aVoid -> true); // 返回 true 成功}});}}
3、配置类
@Configuration
public class UserRouter {@Beanpublic RouterFunction<ServerResponse> userListRouterFunction() {return RouterFunctions.route(RequestPredicates.GET("/users2/list"),new HandlerFunction<ServerResponse>() {@Overridepublic Mono<ServerResponse> handle(ServerRequest request) {// 查询列表List<UserVO> result = new ArrayList<>();result.add(new UserVO().setId(1).setUsername("yudaoyuanma"));// 返回列表return ServerResponse.ok().bodyValue(result);}});}@Beanpublic RouterFunction<ServerResponse> userGetRouterFunction() {return RouterFunctions.route(RequestPredicates.GET("/users2/get"),new HandlerFunction<ServerResponse>() {@Overridepublic Mono<ServerResponse> handle(ServerRequest request) {// 获得编号Integer id = request.queryParam("id").map(s -> StringUtils.isEmpty(s) ? null : Integer.valueOf(s)).get();// 查询用户UserVO user = new UserVO().setId(id).setUsername(UUID.randomUUID().toString());// 返回列表return ServerResponse.ok().bodyValue(user);}});}@Beanpublic RouterFunction<ServerResponse> demoRouterFunction() {return route(GET("/users2/demo"), request -> ok().bodyValue("demo"));}}
2、编写控制类
- 增删改查使用不同的请求方法Get,Post,Put,Delete实现,减少了Mapping寻址配置
- 返回单条数据使用Mono封装
- 返回集合数据使用Flux封装
@RestController
@RequestMapping(value = "/city")
public class CityWebFluxController {@Autowiredprivate CityHandler cityHandler;//查询单个数据@GetMapping(value = "/{id}")public Mono<City> findCityById(@PathVariable("id") Long id) {return cityHandler.findCityById(id);}//查询批量数据@GetMappingpublic Flux<City> findAllCity() {return cityHandler.findAllCity();}//保存@PostMappingpublic Mono<Long> saveCity(@RequestBody City city) {return cityHandler.save(city);}//更新@PutMappingpublic Mono<Long> modifyCity(@RequestBody City city) {return cityHandler.modifyCity(city);}//删除@DeleteMapping(value = "/{id}")public Mono<Long> deleteCity(@PathVariable("id") Long id) {return cityHandler.deleteCity(id);}
}
3、编写Handler类(类似于Serivce类)
- 这是一个业务类
- 使用JDK8的lambda表达式实现
@Component
public class CityHandler {private final CityRepository cityRepository;@Autowiredpublic CityHandler(CityRepository cityRepository) {this.cityRepository = cityRepository;}//新增public Mono<Long> save(City city) {return Mono.create(cityMonoSink -> cityMonoSink.success(cityRepository.save(city)));}//修改public Mono<Long> modifyCity(City city) {return Mono.create(cityMonoSink -> cityMonoSink.success(cityRepository.updateCity(city)));}//删除public Mono<Long> deleteCity(Long id) {return Mono.create(cityMonoSink -> cityMonoSink.success(cityRepository.deleteCity(id)));}//查找public Mono<City> findCityById(Long id) {return Mono.create(cityMonoSink -> cityMonoSink.success(cityRepository.findCityById(id)));}//查找public Flux<City> findAllCity() {return Flux.create(cityFluxSink -> {cityRepository.findAll().forEach(city -> cityFluxSink.next(city));cityFluxSink.complete();});}}
二、延伸使用
2、请求头解析
- headers = “myheader=myvalue”
- @RequestHeader(“myheader”)
- @CookieValue(“tid”)
@GetMapping(path = "/filter/{name}", headers = "myheader=myvalue")
public Mono<String> headerFilter(@PathVariable(name = "name") String name) {return Mono.just("request filter: " + name);
}@GetMapping(path = "get")
public Mono<String> getHeader(@RequestHeader("myheader") String header,@RequestHeader("user-agent") String userAgent) {return Mono.just("request headers: myheader=" + header + " userAgent=" + userAgent);
}@GetMapping(path = "cookie")
public Mono<String> getCookie(@CookieValue("tid") String tid) {return Mono.just("request cookies tid=" + tid);
}
3、请求参数
https://spring.hhui.top/spring-blog/2020/08/27/200827-SpringBoot%E7%B3%BB%E5%88%97WebFlux%E4%B9%8BPath%E5%8F%82%E6%95%B0%E8%A7%A3%E6%9E%90%E4%B8%8Eurl%E6%98%A0%E5%B0%84/
- @PathVariable(name = “index”)
@GetMapping(path = "/basic/{index}")
public Mono<String> basic(@PathVariable(name = "index") int index) {return Mono.just("path index: " + index);
}
4、访问静态资源
(1)配置静态资源未知
//1、通过注册方式
@SpringBootApplication
public class Application implements WebFluxConfigurer {@Overridepublic void addResourceHandlers(ResourceHandlerRegistry registry) {registry.addResourceHandler("/**").addResourceLocations("classpath:/o2/");}public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}
//2通过路由设置方式
@Bean
public RouterFunction<ServerResponse> indexRouter(@Value("classpath:/index.html") final Resource indexHtml,@Value("classpath:/self/s.html") final Resource sHtml) {return RouterFunctions.route(RequestPredicates.GET("/index"),request -> ServerResponse.ok().contentType(MediaType.TEXT_HTML).bodyValue(indexHtml)).andRoute(RequestPredicates.GET("/s"),request -> ServerResponse.ok().contentType(MediaType.TEXT_HTML).bodyValue(sHtml));
}
6、ES响应控制类
@RestController
@RequestMapping("/users")
public class UserController {private static final UserDO USER_NULL = new UserDO();@Autowiredprivate UserRepository userRepository;@GetMapping("/list")public Flux<UserVO> list() {return userRepository.findAll().map(userDO -> new UserVO().setId(userDO.getId()).setUsername(userDO.getUsername()));}@GetMapping("/get")public Mono<UserVO> get(@RequestParam("id") Integer id) {// 返回return userRepository.findById(id).map(userDO -> new UserVO().setId(userDO.getId()).setUsername(userDO.getUsername()));}@PostMapping("add")public Mono<Integer> add(UserAddDTO addDTO) {Mono<UserDO> user = userRepository.findByUsername(addDTO.getUsername());return user.defaultIfEmpty(USER_NULL) // 设置 USER_NULL 作为 null 的情况,否则 flatMap 不会往下走.flatMap(new Function<UserDO, Mono<Integer>>() {@Overridepublic Mono<Integer> apply(UserDO userDO) {if (userDO != USER_NULL) {// 返回 -1 表示插入失败。// 实际上,一般是抛出 ServiceException 异常。因为这个示例项目里暂时没做全局异常的定义,所以暂时返回 -1 啦return Mono.just(-1);}// 将 addDTO 转成 UserDOuserDO = new UserDO().setId((int) (System.currentTimeMillis() / 1000)) // 使用当前时间戳的描述,作为 ID 。.setUsername(addDTO.getUsername()).setPassword(addDTO.getPassword()).setCreateTime(new Date());// 插入数据库return userRepository.save(userDO).map(UserDO::getId);}});}@PostMapping("/update")public Mono<Boolean> update(UserUpdateDTO updateDTO) {// 查询用户Mono<UserDO> user = userRepository.findById(updateDTO.getId());// 执行更新return user.defaultIfEmpty(USER_NULL) // 设置 USER_NULL 作为 null 的情况,否则 flatMap 不会往下走.flatMap(new Function<UserDO, Mono<Boolean>>() {@Overridepublic Mono<Boolean> apply(UserDO userDO) {// 如果不存在该用户,则直接返回 false 失败if (userDO == USER_NULL) {return Mono.just(false);}// 查询用户是否存在return userRepository.findByUsername(updateDTO.getUsername()).defaultIfEmpty(USER_NULL) // 设置 USER_NULL 作为 null 的情况,否则 flatMap 不会往下走.flatMap(new Function<UserDO, Mono<? extends Boolean>>() {@Overridepublic Mono<? extends Boolean> apply(UserDO usernameUserDO) {// 如果用户名已经使用(该用户名对应的 id 不是自己,说明就已经被使用了)if (usernameUserDO != USER_NULL && !Objects.equals(updateDTO.getId(), usernameUserDO.getId())) {return Mono.just(false);}// 执行更新userDO.setUsername(updateDTO.getUsername());userDO.setPassword(updateDTO.getPassword());return userRepository.save(userDO).map(userDO -> true); // 返回 true 成功}});}});}@PostMapping("/delete") // URL 修改成 /delete ,RequestMethod 改成 DELETEpublic Mono<Boolean> delete(@RequestParam("id") Integer id) {// 查询用户Mono<UserDO> user = userRepository.findById(id);// 执行删除。这里仅仅是示例,项目中不要物理删除,而是标记删除return user.defaultIfEmpty(USER_NULL) // 设置 USER_NULL 作为 null 的情况,否则 flatMap 不会往下走.flatMap(new Function<UserDO, Mono<Boolean>>() {@Overridepublic Mono<Boolean> apply(UserDO userDO) {// 如果不存在该用户,则直接返回 false 失败if (userDO == USER_NULL) {return Mono.just(false);}// 执行删除return userRepository.deleteById(id).map(aVoid -> true); // 返回 true 成功}});}}