1. 项目结构
2. Maven依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.2</version><relativePath/> <!-- lookup parent from repository --></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency><!-- https://mvnrepository.com/artifact/io.github.resilience4j/resilience4j-spring-boot3 --><dependency><groupId>io.github.resilience4j</groupId><artifactId>resilience4j-spring-boot3</artifactId><version>2.2.0</version></dependency><!-- Redis cache dependency --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency></dependencies>
注:Resilience4j 需要 AOP(面向切面编程)相关的依赖,因为它利用 AOP 来动态地拦截和处理方法调用。
3. application.yml
spring:application:name: spring_resilience4j # Spring Boot 应用程序名称cache:type: redis # 缓存类型,使用 Redisdata:redis:host: 192.168.186.77 # Redis 服务器主机地址port: 6379 # Redis 服务器端口号resilience4j:timelimiter:instances:timeoutService:timeoutDuration: 2s # 超时时间为2秒circuitbreaker:instances:circuitbreakerService:slidingWindowSize: 10 # 滑动窗口大小为10failureRateThreshold: 50 # 失败率阈值为50%waitDurationInOpenState: 10s # 打开状态等待时间为10秒ratelimiter:instances:ratelimiterService:limitForPeriod: 1 # 每个周期允许的最大请求数为1limitRefreshPeriod: 10s # 速率限制刷新周期为10秒timeoutDuration: 500ms # 等待许可的超时时间为500毫秒retry:instances:retryService:maxAttempts: 5 # 最大重试次数为5次waitDuration: 1000ms # 重试间隔为1000毫秒(1秒)enableExponentialBackoff: true # 启用指数回退exponentialBackoffMultiplier: 1.5 # 指数回退倍数为1.5retryExceptions:- java.lang.RuntimeException # 需要重试的异常类型bulkhead:instances:bulkheadService:maxConcurrentCalls: 5 # 批隔离允许的最大并发调用数为5maxWaitDuration: 1s # 等待许可的最大时间为1秒
4. SpringResilience4jApplication.java
package org.example;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
@SpringBootApplication
@EnableCaching //开启缓存
public class SpringResilience4jApplication {public static void main(String[] args) {SpringApplication.run(SpringResilience4jApplication.class, args);}
}
5. Common.java
package org.example.controller;import org.example.service.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.CompletableFuture;@RestController
public class Common {@AutowiredTimeoutService timeoutService;@AutowiredRateLimiterService rateLimiterService;@AutowiredCircuitBreakerService circuitBreakerService;@AutowiredRetryService retryService;@AutowiredBulkheadService bulkheadService;//模拟超时@GetMapping("/timeout")public CompletableFuture<String> timeout() {return timeoutService.timeoutExample();}//模拟限速@GetMapping("/rateLimiter")public CompletableFuture<String> rateLimiter() {return rateLimiterService.rateLimiterExample();}//模拟回退@GetMapping("/circuitBreaker")public ResponseEntity<String> circuitBreaker() {return ResponseEntity.ok(circuitBreakerService.CircuitBreaker());}//模拟重试@GetMapping("/retry/{id}")public String getItemById(@PathVariable String id) {return retryService.getItemById(id);}// 模拟批隔离@GetMapping("/process/{id}")public String processRequest(@PathVariable String id) {return bulkheadService.processRequest(id);}
}
6. BulkheadService.java(批隔离)
package org.example.service;import io.github.resilience4j.bulkhead.BulkheadFullException;
import io.github.resilience4j.bulkhead.annotation.Bulkhead;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;@Service
public class BulkheadService {private static final Logger logger = LoggerFactory.getLogger(BulkheadService.class);@Bulkhead(name = "bulkheadService", fallbackMethod = "fallback")public String processRequest(String id) {logger.info("Processing request: {}", id);try {// 模拟处理延迟Thread.sleep(10000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}return "Processed: " + id;}// 回退方法public String fallback(String id, BulkheadFullException ex) {logger.error("Bulkhead is full. Falling back for request: {}",id,ex);return "Bulkhead is full. Please try again later.";}
}
application.yml对应的配置信息:
bulkhead:instances:bulkheadService:maxConcurrentCalls: 5maxWaitDuration: 1s
解释:
-
maxConcurrentCalls:
- 最大并发调用数。在此示例中,设置为 5,表示同一时间最多允许 5 个并发调用。如果超过这个数量,额外的调用将被阻塞,直到有空闲的调用资源。
-
maxWaitDuration:
- 在被阻塞的调用被拒绝之前,最大等待时间。在此示例中,设置为 1 秒,表示如果一个调用在 1 秒内没有获取到许可,将会被拒绝并抛出
BulkheadFullException
。
- 在被阻塞的调用被拒绝之前,最大等待时间。在此示例中,设置为 1 秒,表示如果一个调用在 1 秒内没有获取到许可,将会被拒绝并抛出
结果:
先启动(目录12BulkheadTest测试类)模拟并发。
出现该结果证明并发调用数已经满。
7. CircuitBreakerService.java(熔断)
package org.example.service;import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import org.springframework.stereotype.Service;@Service
public class CircuitBreakerService {@CircuitBreaker(name = "circuitbreakerService", fallbackMethod = "fallback")public String CircuitBreaker() {if (Math.random() > 0.5) {throw new RuntimeException("It is Failure!");}return "It is Successfully!";}public String fallback(Throwable ex) {return "Fallback : " + ex.getMessage();}
}
application.yml对应的配置信息:
circuitbreaker:instances:circuitbreakerService:slidingWindowSize: 10failureRateThreshold: 50waitDurationInOpenState: 10s
解释:
-
slidingWindowSize:
- 滑动窗口大小。在此示例中,设置为10,表示
CircuitBreaker
将根据最近的10个调用的结果来计算失败率。
- 滑动窗口大小。在此示例中,设置为10,表示
-
failureRateThreshold:
- 失败率阈值。在此示例中,设置为 50,表示如果滑动窗口中的调用失败率超过 50%,
CircuitBreaker
将打开(短路)。
- 失败率阈值。在此示例中,设置为 50,表示如果滑动窗口中的调用失败率超过 50%,
-
waitDurationInOpenState:
CircuitBreaker
打开状态的等待时间。在此示例中,设置为10秒,表示CircuitBreaker
在打开状态下会保持10秒,然后进入半开状态以重新测试服务的可用性。
-
闭合状态(Closed):
- 默认状态,所有请求正常通过并进行监控。
- 如果失败率超过阈值,进入打开状态。
-
打开状态(Open):
- 在打开状态下,所有请求将立即失败,不会调用实际的方法。
- 在等待时间结束后,进入半开状态。
-
半开状态(Half-Open):
- 在半开状态下,会允许部分请求通过以测试服务是否恢复。
- 如果请求成功率恢复正常,恢复到闭合状态;否则重新进入打开状态。
结果:
监听消息:
CircuitBreaker Event: 2024-07-29T23:12:41.621862900+08:00[Asia/Shanghai]: CircuitBreaker 'circuitbreakerService' recorded an error: 'java.lang.RuntimeException: It is Failure!'. Elapsed time: 0 ms
8. RateLimiterService.java(限速)
package org.example.service;import io.github.resilience4j.ratelimiter.annotation.RateLimiter;
import org.springframework.stereotype.Service;import java.util.concurrent.CompletableFuture;@Service
public class RateLimiterService {@RateLimiter(name = "ratelimiterService", fallbackMethod = "fallbackRateLimiter")public CompletableFuture<String> rateLimiterExample() {return CompletableFuture.supplyAsync(() -> "It is Success!");}public CompletableFuture<String> fallbackRateLimiter(Exception e) {return CompletableFuture.completedFuture("Too many requests");}
}
application.yml对应的配置信息 :
ratelimiter:instances:ratelimiterService:limitForPeriod: 1limitRefreshPeriod: 10stimeoutDuration: 500ms
解释:
-
limitForPeriod:
- 指定每个刷新周期内允许的最大请求数。在此示例中,设置为1,表示每个刷新周期只允许1个请求通过。
-
limitRefreshPeriod:
- 指定速率限制器的刷新周期。在此示例中,设置为10秒,表示每10秒刷新一次,重置允许的请求数。
-
timeoutDuration:
- 指定在速率限制器中等待许可的最大时间。如果请求在
timeoutDuration
内未获得许可,则会被拒绝。在此示例中,设置为 500 毫秒。
- 指定在速率限制器中等待许可的最大时间。如果请求在
结果(10s内发送两次请求):
监听消息:
RateLimiter Event: RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='ratelimiterService', creationTime=2024-07-29T23:03:37.189104500+08:00[Asia/Shanghai]}
RateLimiter Event: RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='ratelimiterService', creationTime=2024-07-29T23:03:38.995416700+08:00[Asia/Shanghai]}
RateLimiter Event: RateLimiterEvent{type=FAILED_ACQUIRE, rateLimiterName='ratelimiterService', creationTime=2024-07-29T23:03:40.927729300+08:00[Asia/Shanghai]}
9. Resilience4jEventListener.java(事件监听)
package org.example.service;import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RateLimiterRegistry;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryRegistry;
import io.github.resilience4j.timelimiter.TimeLimiter;
import io.github.resilience4j.timelimiter.TimeLimiterRegistry;
import jakarta.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class Resilience4jEventListener {private static final Logger logger = LoggerFactory.getLogger(Resilience4jEventListener.class);private final RetryRegistry retryRegistry;private final TimeLimiterRegistry timeLimiterRegistry;private final CircuitBreakerRegistry circuitBreakerRegistry;private final RateLimiterRegistry rateLimiterRegistry;@Autowiredpublic Resilience4jEventListener(CircuitBreakerRegistry circuitBreakerRegistry,RateLimiterRegistry rateLimiterRegistry,RetryRegistry retryRegistry,TimeLimiterRegistry timeLimiterRegistry) {this.circuitBreakerRegistry = circuitBreakerRegistry;this.rateLimiterRegistry = rateLimiterRegistry;this.retryRegistry = retryRegistry;this.timeLimiterRegistry = timeLimiterRegistry;}@PostConstructpublic void postConstruct() {// 注册 Retry 事件监听器Retry retry = retryRegistry.retry("retryService");retry.getEventPublisher().onEvent(event -> logger.info("Retry event: {}", event));// 注册 CircuitBreaker 事件监听器CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker("circuitbreakerService");circuitBreaker.getEventPublisher().onEvent(event -> logger.info("CircuitBreaker Event: {}", event));// 注册 RateLimiter 事件监听器RateLimiter rateLimiter = rateLimiterRegistry.rateLimiter("ratelimiterService");rateLimiter.getEventPublisher().onEvent(event -> logger.info("RateLimiter Event: {}", event));// 注册 TimeLimiter 事件监听器TimeLimiter timeLimiter = timeLimiterRegistry.timeLimiter("timeoutService");timeLimiter.getEventPublisher().onEvent(event -> logger.info("TimeLimiter Event: {}", event));}
}
调试运行的信息如下:
2024-07-29T23:02:39.073+08:00 INFO 27996 --- [pool-2-thread-1] o.e.service.Resilience4jEventListener : TimeLimiter Event: 2024-07-29T23:02:39.073307500+08:00[Asia/Shanghai]: TimeLimiter 'timeoutService' recorded a timeout exception.
2024-07-29T23:03:37.189+08:00 INFO 27996 --- [nio-8080-exec-4] o.e.service.Resilience4jEventListener : RateLimiter Event: RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='ratelimiterService', creationTime=2024-07-29T23:03:37.189104500+08:00[Asia/Shanghai]}
2024-07-29T23:03:38.995+08:00 INFO 27996 --- [nio-8080-exec-5] o.e.service.Resilience4jEventListener : RateLimiter Event: RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='ratelimiterService', creationTime=2024-07-29T23:03:38.995416700+08:00[Asia/Shanghai]}
2024-07-29T23:03:40.927+08:00 INFO 27996 --- [nio-8080-exec-6] o.e.service.Resilience4jEventListener : RateLimiter Event: RateLimiterEvent{type=FAILED_ACQUIRE, rateLimiterName='ratelimiterService', creationTime=2024-07-29T23:03:40.927729300+08:00[Asia/Shanghai]}
2024-07-29T23:12:41.623+08:00 INFO 27996 --- [nio-8080-exec-9] o.e.service.Resilience4jEventListener : CircuitBreaker Event: 2024-07-29T23:12:41.621862900+08:00[Asia/Shanghai]: CircuitBreaker 'circuitbreakerService' recorded an error: 'java.lang.RuntimeException: It is Failure!'. Elapsed time: 0 ms
2024-07-29T23:23:15.612+08:00 INFO 27996 --- [nio-8080-exec-3] org.example.service.RetryService : Fetching item with id 2
2024-07-29T23:27:22.607+08:00 INFO 27996 --- [nio-8080-exec-6] org.example.service.RetryService : Fetching item with id 3
2024-07-29T23:34:40.837+08:00 INFO 27996 --- [nio-8080-exec-2] org.example.service.RetryService : Fetching item with id 4
2024-07-29T23:34:42.973+08:00 INFO 27996 --- [nio-8080-exec-1] org.example.service.RetryService : Fetching item with id 5
2024-07-29T23:34:44.777+08:00 INFO 27996 --- [nio-8080-exec-3] org.example.service.RetryService : Fetching item with id 6
2024-07-29T23:34:44.793+08:00 INFO 27996 --- [nio-8080-exec-3] o.e.service.Resilience4jEventListener : Retry event: 2024-07-29T23:34:44.793527800+08:00[Asia/Shanghai]: Retry 'retryService', waiting PT1S until attempt '1'. Last attempt failed with exception 'java.lang.RuntimeException: Simulated database error'.
2024-07-29T23:34:45.817+08:00 INFO 27996 --- [nio-8080-exec-3] org.example.service.RetryService : Fetching item with id 6
2024-07-29T23:34:45.822+08:00 INFO 27996 --- [nio-8080-exec-3] o.e.service.Resilience4jEventListener : Retry event: 2024-07-29T23:34:45.822567400+08:00[Asia/Shanghai]: Retry 'retryService' recorded a successful retry attempt. Number of retry attempts: '1', Last exception was: 'java.lang.RuntimeException: Simulated database error'.
注:该类进行事件监听,便于调试。
10. RetryService.java(缓存+重试)
package org.example.service;import io.github.resilience4j.retry.annotation.Retry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;@Service
public class RetryService {private static final Logger logger = LoggerFactory.getLogger(RetryService.class);@Cacheable(value = "items", key = "#id")@Retry(name = "retryService", fallbackMethod = "fallback")public String getItemById(String id) {logger.info("Fetching item with id {}", id);// 模拟数据库调用,可能会引发异常if (Math.random() > 0.8) {throw new RuntimeException("Simulated database error");}return "Cache " + id;}// 回退方法public String fallback(String id, RuntimeException e) {return "Fallback : " + id;}
}
注:该类简单的使用redis缓存,重试机制可以自动重新执行失败的操作。通过配置重试次数和重试间隔,可以在遇到暂时性错误时增加操作成功的机会。
application.yml对应的配置信息:
retry:instances:retryService:maxAttempts: 5waitDuration: 1000enableExponentialBackoff: trueexponentialBackoffMultiplier: 1.5retryExceptions:- java.lang.RuntimeException
解释:
-
maxAttempts:
- 最大重试次数。在此示例中,设置为 5,表示如果方法调用失败,将最多重试 5 次。
-
waitDuration:
- 重试之间的等待时间(以毫秒为单位)。在此示例中,设置为 1000 毫秒(1 秒),表示每次重试之间等待 1 秒。
-
enableExponentialBackoff:
- 启用指数回退策略。设置为
true
表示启用指数回退。
- 启用指数回退策略。设置为
-
exponentialBackoffMultiplier:
- 指数回退的倍数。在此示例中,设置为1.5,表示每次重试之间的等待时间将乘以 1.5。
-
retryExceptions:
- 需要重试的异常类型列表。在此示例中,指定了
java.lang.RuntimeException
,表示当方法抛出RuntimeException
时会进行重试。
- 需要重试的异常类型列表。在此示例中,指定了
结果:
直接访问(保证redis正常运行),然后不断的尝试把路径1依次累加,然后发送请求,直到出现监听消息的内容即可看到重试,当方法上使用了 @Cacheable
注解时,如果请求的缓存存在且未过期,那么该方法不会实际执行,而是直接返回缓存中的数据。这意味着在这种情况下,@Retry
注解可能不会生效,因为方法调用不会实际发生,导致没有机会触发重试机制。
监听消息:
2024-07-29T23:23:15.612+08:00 INFO 27996 --- [nio-8080-exec-3] org.example.service.RetryService : Fetching item with id 2
2024-07-29T23:27:22.607+08:00 INFO 27996 --- [nio-8080-exec-6] org.example.service.RetryService : Fetching item with id 3
2024-07-29T23:34:40.837+08:00 INFO 27996 --- [nio-8080-exec-2] org.example.service.RetryService : Fetching item with id 4
2024-07-29T23:34:42.973+08:00 INFO 27996 --- [nio-8080-exec-1] org.example.service.RetryService : Fetching item with id 5
2024-07-29T23:34:44.777+08:00 INFO 27996 --- [nio-8080-exec-3] org.example.service.RetryService : Fetching item with id 6
2024-07-29T23:34:44.793+08:00 INFO 27996 --- [nio-8080-exec-3] o.e.service.Resilience4jEventListener : Retry event: 2024-07-29T23:34:44.793527800+08:00[Asia/Shanghai]: Retry 'retryService', waiting PT1S until attempt '1'. Last attempt failed with exception 'java.lang.RuntimeException: Simulated database error'.
2024-07-29T23:34:45.817+08:00 INFO 27996 --- [nio-8080-exec-3] org.example.service.RetryService : Fetching item with id 6
2024-07-29T23:34:45.822+08:00 INFO 27996 --- [nio-8080-exec-3] o.e.service.Resilience4jEventListener : Retry event: 2024-07-29T23:34:45.822567400+08:00[Asia/Shanghai]: Retry 'retryService' recorded a successful retry attempt. Number of retry attempts: '1', Last exception was: 'java.lang.RuntimeException: Simulated database error'.
11. TimeoutService.java(超时)
package org.example.service;import io.github.resilience4j.timelimiter.annotation.TimeLimiter;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;@Service
public class TimeoutService {@TimeLimiter(name = "timeoutService", fallbackMethod = "fallback")public CompletableFuture<String> timeoutExample() {return CompletableFuture.supplyAsync(() -> {try {Thread.sleep(5000); // 模拟长时间处理,这里设置为5秒} catch (InterruptedException e) {Thread.currentThread().interrupt();}return "Success";});}public CompletableFuture<String> fallback(Throwable t) {return CompletableFuture.completedFuture("fallback: timeout!");}
}
application.yml对应的配置信息:
timelimiter:instances:timeoutService:timeoutDuration: 2s
解释:设置为 2 秒。如果方法调用在 2 秒内未完成,TimeLimiter
会中断调用并执行回退方法。
结果:
监听消息:
TimeLimiter Event: 2024-07-29T23:02:39.073307500+08:00[Asia/Shanghai]: TimeLimiter 'timeoutService' recorded a timeout exception.
12. BulkheadTest.java(批隔离测试)
package org.example.test;import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class BulkheadTest {public static void main(String[] args) {// 创建一个固定大小的线程池,线程数为6ExecutorService executor = Executors.newFixedThreadPool(6);// 初始化一个 CountDownLatch,用于控制所有任务的开始CountDownLatch latch = new CountDownLatch(1);// 提交6个任务到线程池for (int i = 1; i <= 6; i++) {final int id = i;executor.submit(() -> {try {// 等待 latch 释放,确保所有任务同时开始latch.await();// 创建 URL 对象,指定请求路径URL url = new URL("http://localhost:8080/process/" + id);// 打开连接HttpURLConnection conn = (HttpURLConnection) url.openConnection();// 设置请求方法为 GETconn.setRequestMethod("GET");// 获取响应代码int responseCode = conn.getResponseCode();// 打印响应代码System.out.println("Response Code for request " + id + ": " + responseCode);} catch (Exception e) {// 打印异常信息e.printStackTrace();}});}// 释放 latch,启动所有任务latch.countDown();// 关闭线程池executor.shutdown();}
}
注:该类用于模拟批隔离,策略限制了同时处理的并发调用数量,确保系统部分组件的问题不会导致整个系统的瘫痪。
13. 总结
通过Resilience4j+Redis实现超时检测,限速访问,以及重试,还有熔断回退和批隔离等简单的案例模拟,仅供学习交流使用。