前言
在真正测试 Divide 插件时,想要知道后端服务(以下称为 Client)是如何将自己的信息注册到管理台(以下称为 Client)。这里后端服务用的是 shenyu 自带的 http 的例子,项目名字为 shenyu-examples-http。
下图描述了本文研究的内容——服务注册时 Client端向 Admin 注册的数据同步——在 shenyu 架构中处于什么位置。红色部分都是我自己加的,在官网的图中没有。
阅读准备
Disruptor入门及应用
正文
Client事件监听器监听本地的 Context 的刷新事件
当 Client (Spring 应用)依赖注入后,Spring 框架会刷新上下文 Context,这时,shenyu 自定义的一个监听 ContextRefreshedEvent 的监听器 SpringMvcClientEventListener (AbstractContextRefreshedEventListener 的子类)会触发 onApplicationEvent 方法。
- AbstractContextRefreshedEventListener.onApplicationEvent()
public abstract class AbstractContextRefreshedEventListener<T, A extends Annotation> implements ApplicationListener<ContextRefreshedEvent> {// ...@Overridepublic void onApplicationEvent(@NonNull final ContextRefreshedEvent event) {context = event.getApplicationContext();// 1. 拿到 beansMap<String, T> beans = getBeans(context);if (MapUtils.isEmpty(beans)) {return;}// 2. 原子地设置 registered 为 trueif (!registered.compareAndSet(false, true)) {return;}if (isDiscoveryLocalMode) {// 3. 如果是“本地发现”模式,发布用于注册 URI 的 DTOpublisher.publishEvent(buildURIRegisterDTO(context, beans));}// 4. 处理每个 bean,具体是发布 bean 的注册信息给 Disruptor 的 QueueConsumerbeans.forEach(this::handle);// 5. apiModules 的 key 是 beanName,value 是 bean 的成员变量Map<String, Object> apiModules = context.getBeansWithAnnotation(ApiModule.class);// 6. 处理每个 apiModules,具体是发布 apiModules 的注册信息给 Disruptor 的 QueueConsumerapiModules.forEach((k, v) -> handleApiDoc(v, beans));}protected void handle(final String beanName, final T bean) {// ...}private void handleApiDoc(final Object bean, final Map<String, T> beans) {// ...}
}
从 SpringMvcClientEventListener.getBeans() 拿到 Beans
- SpringMvcClientEventListener.java
public class SpringMvcClientEventListener extends AbstractContextRefreshedEventListener<Object, ShenyuSpringMvcClient> {// ...private final ShenyuClientRegisterEventPublisher publisher = ShenyuClientRegisterEventPublisher.getInstance();@Overrideprotected Map<String, Object> getBeans(final ApplicationContext context) {// Filter out// isFull 这个 Boolean 值代表的是:是否代理整个服务,目前适用于 SpringMvc/SpringCouldif (Boolean.TRUE.equals(isFull)) {// 在全代理模式下,发布一个事件,这个事件包含了服务的元数据,用于注册服务getPublisher().publishEvent(MetaDataRegisterDTO.builder().contextPath(getContextPath()) // 设置服务的上下文路径.addPrefixed(addPrefixed) // 设置是否添加前缀.appName(getAppName()) // 设置应用名称.path(UriComponentsBuilder.fromUriString(PathUtils.decoratorPathWithSlash(getContextPath()) + EVERY_PATH).build().encode().toUriString())// 设置服务的路径,这里使用了 UriComponentsBuilder 来构建URI,将上下文路径装饰后加上一个通配符,代表匹配所有路径.rpcType(RpcTypeEnum.HTTP.getName()) // 设置远程调用类型为 HTTP.enabled(true) // 设置服务为启用状态.ruleName(getContextPath()) // 使用上下文路径作为规则名称.build());LOG.info("init spring mvc client success with isFull mode");// 发布一个 URI 注册的事件,传入空的映射作为参数publisher.publishEvent(buildURIRegisterDTO(context, Collections.emptyMap()));return Collections.emptyMap();}// shenyu-examples-http 用的不是全代理模式,因为 isFull 为 false,此时直接返回带 Controller 注解的 beanreturn context.getBeansWithAnnotation(Controller.class);}
}
publisher.publishEvent(buildURIRegisterDTO(context, Collections.emptyMap()));
发布一个 URI 注册的事件,传入空的映射作为参数。
ShenyuClientRegisterEventPublisher 给 Client 端的 Disruptor 的 QueueConsumer 发布要向 Admin 注册的数据(是的,此时还没传给 Admin,还停留在 Client 端)
- ShenyuClientRegisterEventPublisher.publishEvent() 调用 DisruptorProvider.onData() 传递数据
public class ShenyuClientRegisterEventPublisher {// ...private DisruptorProviderManage<DataTypeParent> providerManage;public void publishEvent(final DataTypeParent data) {DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();// data 传给 Disruptor provider provider.onData(data);}
}
-
DisruptorProvider 传递给 RingBuffer.publishEvent(),最终将注册的信息发布给 Diruptor 的 QueueConsumer。
ps: Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,能够在无锁的情况下实现网络的Queue并发操作,基于Disruptor开发的系统单线程能支撑每秒600万订单。
public class DisruptorProvider<T> {// ...private final RingBuffer<DataEvent<T>> ringBuffer;private final boolean isOrderly;private final EventTranslatorOneArg<DataEvent<T>, T> translatorOneArg = (event, sequence, t) -> event.setData(t);// ...public void onData(final T data) {if (isOrderly) {throw new IllegalArgumentException("The current provider is of orderly type. Please use onOrderlyData() method.");}try {// 由 ringBuffer 发布事件ringBuffer.publishEvent(translatorOneArg, data);} catch (Exception ex) {logger.error("ex", ex);}}
}
由 QueueConsumer.onEvent() 接收 RingBuffer.publishEvent() 发布的事件,并进行处理
- 从 DisruptorProviderManage.startup 的源码中可以看到,在创建 Disruptor 时,线程池 OrderlyExecutor 被传进了 QueueConsumer,
public class DisruptorProviderManage<T> {// ...private final Integer consumerSize;private final QueueConsumerFactory<T> consumerFactory;// ...public void startup(final boolean isOrderly) {// 创建一个定制的线程池,用于消费者OrderlyExecutor executor = new OrderlyExecutor(isOrderly, consumerSize, consumerSize, 0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),DisruptorThreadFactory.create("shenyu_disruptor_consumer_", false), new ThreadPoolExecutor.AbortPolicy());int newConsumerSize = this.consumerSize;EventFactory<DataEvent<T>> eventFactory;// 根据是否有序来调整消费者数量和选择事件工厂if (isOrderly) {// 有序模式下,消费者数量设为1,使用有序的事件工厂newConsumerSize = 1;eventFactory = new OrderlyDisruptorEventFactory<>();} else {// 无序模式下,使用默认的事件工厂eventFactory = new DisruptorEventFactory<>();}// 创建Disruptor实例,配置其基本参数Disruptor<DataEvent<T>> disruptor = new Disruptor<>(eventFactory,size,DisruptorThreadFactory.create("shenyu_disruptor_provider_" + consumerFactory.fixName(), false),ProducerType.MULTI,new BlockingWaitStrategy());// 创建消费者数组,根据newConsumerSize指定的大小@SuppressWarnings("all")QueueConsumer<T>[] consumers = new QueueConsumer[newConsumerSize];for (int i = 0; i < newConsumerSize; i++) {consumers[i] = new QueueConsumer<>(executor, consumerFactory);}// 将消费者注册到Disruptor,使用工作池模式disruptor.handleEventsWithWorkerPool(consumers);// 设置默认的异常处理器,这里选择忽略异常disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());// 启动Disruptordisruptor.start();// 获取Disruptor的环形缓冲区,用于发布事件RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();// 创建并存储DisruptorProvider实例,用于向Disruptor发布事件provider = new DisruptorProvider<>(ringBuffer, disruptor, isOrderly);}
}
- 当接收到一个事件时,QueueConsumer 将任务交给线程池去处理事件,处理事件的 Runnable 接口由工厂 factory 产生。
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {// ...private final QueueConsumerFactory<T> factory;// ...@Overridepublic void onEvent(final DataEvent<T> t) {if (Objects.nonNull(t)) {ThreadPoolExecutor executor = orderly(t);QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();queueConsumerExecutor.setData(t.getData());// help gct.setData(null);executor.execute(queueConsumerExecutor);}}
}
- QueueConsumerExecutor 在 Client 端的消费者执行器 RegisterClientConsumerExecutor
/*** The type Consumer executor.*/
public final class RegisterClientConsumerExecutor<T extends DataTypeParent> extends QueueConsumerExecutor<T> {private final Map<DataType, ExecutorTypeSubscriber<T>> subscribers;private RegisterClientConsumerExecutor(final Map<DataType, ExecutorTypeSubscriber<T>> executorSubscriberMap) {this.subscribers = new EnumMap<>(executorSubscriberMap);}@Override// run 接口继承自 QueueConsumerExecutor,而 QueueConsumerExecutor 继承自 Runnablepublic void run() {final T data = getData();// subscribers 拿到 ExecutorTypeSubscriber 去处理数据 datasubscribers.get(data.getType()).executor(Lists.newArrayList(data));}/*** The type Register client executor factory.*/public static class RegisterClientExecutorFactory<T extends DataTypeParent> extends AbstractQueueConsumerFactory<T> {@Overridepublic RegisterClientConsumerExecutor<T> create() {Map<DataType, ExecutorTypeSubscriber<T>> map = getSubscribers().stream()// 将 AbstractQueueConsumerFactory.getSubscribers()// 接口返回的 ExecutorSubscriber<T> 转为 ExecutorTypeSubscriber<T>,// 其带有 getType 接口.map(e -> (ExecutorTypeSubscriber<T>) e).collect(Collectors.toMap(ExecutorTypeSubscriber::getType, e -> e));return new RegisterClientConsumerExecutor<>(map);}@Overridepublic String fixName() {return "shenyu_register_client";}}
}
ExecutorTypeSubscriber 继承自 ExecutorSubscriber :
public interface ExecutorTypeSubscriber<T extends DataTypeParent> extends ExecutorSubscriber<T> {`
从下图的 ExecutorTypeSubscriber 接口的实现类可以看到,在 Client 端有 3 个 Subscriber
我们这个例子看的是URI,所以就以 ShenyuClientURIExecutorSubscriber 举例。
数据交由 ShenyuClientURIExecutorSubscriber 执行处理
- ShenyuClientURIExecutorSubscriber.execute()
public class ShenyuClientURIExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {// ...private final ShenyuClientRegisterRepository shenyuClientRegisterRepository;@Overridepublic void executor(final Collection<URIRegisterDTO> dataList) {for (URIRegisterDTO uriRegisterDTO : dataList) {Stopwatch stopwatch = Stopwatch.createStarted();while (true) {// 连得上就跳出死循环try (Socket ignored = new Socket(uriRegisterDTO.getHost(), uriRegisterDTO.getPort())) {break;} catch (IOException e) {long sleepTime = 1000;// maybe the port is delay exposedif (stopwatch.elapsed(TimeUnit.SECONDS) > 5) {LOG.error("host:{}, port:{} connection failed, will retry",uriRegisterDTO.getHost(), uriRegisterDTO.getPort());// If the connection fails for a long time, Increase sleep timeif (stopwatch.elapsed(TimeUnit.SECONDS) > 180) {sleepTime = 10000;}}try {TimeUnit.MILLISECONDS.sleep(sleepTime);} catch (InterruptedException ex) {LOG.error("interrupted when sleep", ex);}}}// 1. 延迟应用关闭时的其他钩子ShenyuClientShutdownHook.delayOtherHooks();// 2. 给 Admin 端发送 DTO 注册信息shenyuClientRegisterRepository.persistURI(uriRegisterDTO);// 3. 向应用添加一个钩子,使得在应用关闭时,应用自动开启一个新线程去注销注册信息ShutdownHookManager.get().addShutdownHook(new Thread(() -> {final URIRegisterDTO offlineDTO = new URIRegisterDTO();BeanUtils.copyProperties(uriRegisterDTO, offlineDTO);offlineDTO.setEventType(EventType.OFFLINE);// 给 Admin 端发送下线 DTOshenyuClientRegisterRepository.offline(offlineDTO);}), 2);}}
}
有三个方法需要说明:
ShenyuClientShutdownHook.delayOtherHooks()
延迟应用关闭时的其他钩子ShenyuClientRegisterRepository.persistURI()
给 Admin 端发送 DTO 注册信息ShutdownHookManager.get().addShutdownHook()
向应用添加一个钩子,使得在应用关闭时,应用自动开启一个新线程去注销注册信息
延迟应用关闭时的其他钩子
-
ShenyuClientShutdownHook.delayOtherHooks()
- 利用 CAS 不加锁地确保并发时
TakeoverOtherHooksThread
线程只被运行一次 - 一个接管其他钩子的线程
- 利用 CAS 不加锁地确保并发时
public class ShenyuClientShutdownHook {// ...private static final AtomicBoolean DELAY = new AtomicBoolean(false);private static String hookNamePrefix = "ShenyuClientShutdownHook";private static AtomicInteger hookId = new AtomicInteger(0);private static Properties props;private static IdentityHashMap<Thread, Thread> delayHooks = new IdentityHashMap<>();private static IdentityHashMap<Thread, Thread> delayedHooks = new IdentityHashMap<>();// ..../*** Delay other shutdown hooks.*/public static void delayOtherHooks() {// 1. 利用 CAS 不加锁地确保并发时 TakeoverOtherHooksThread 线程只被运行一次if (!DELAY.compareAndSet(false, true)) {return;}// 2. 一个接管其他钩子的线程TakeoverOtherHooksThread thread = new TakeoverOtherHooksThread();thread.start();}/*** Delay other shutdown hooks thread.*/private static class TakeoverOtherHooksThread extends Thread {@Override// 1. 该线程用于生成钩子,这些钩子用来延迟执行已经添加的钩子,为的是处理一些资源的关闭,和注册信息的注销public void run() {int shutdownWaitTime = Integer.parseInt(props.getProperty("shutdownWaitTime", "3000"));int delayOtherHooksExecTime = Integer.parseInt(props.getProperty("delayOtherHooksExecTime", "2000"));IdentityHashMap<Thread, Thread> hooks = null;try {// 2. 通过反射拿到应用关闭时的所有钩子Class<?> clazz = Class.forName(props.getProperty("applicationShutdownHooksClassName", "java.lang.ApplicationShutdownHooks"));Field field = clazz.getDeclaredField(props.getProperty("applicationShutdownHooksFieldName", "hooks"));field.setAccessible(true);hooks = (IdentityHashMap<Thread, Thread>) field.get(clazz);} catch (ClassNotFoundException | NoSuchFieldException | IllegalAccessException ex) {LOG.error(ex.getMessage(), ex);}long s = System.currentTimeMillis();// 3. 限制处理钩子的时间在 delayOtherHooksExecTime 之内,为什么要控制时间,难道不会遗漏一些钩子无法延迟吗?// GPT:// 答:1. 避免死锁或长时间阻塞// 2. 可以确保这个延迟逻辑不会过度拖延应用的关闭过程// 3. 实用性考虑: 在大多数情况下,如果在给定的时间内无法连接到或修改某些钩子,可能是因为存在一些异常或特殊情况。// 在这种情况下,继续等待可能不会带来太多好处,而是增加了关闭过程的复杂性和不确定性。// 确实,这种方法可能会遗漏一些在延迟期间新注册的钩子,但这通常是一个权衡的结果,设计者可能认为这种情况很少发生,或者遗漏的风险相对较小。while (System.currentTimeMillis() - s < delayOtherHooksExecTime) {for (Iterator<Thread> iterator = Objects.requireNonNull(hooks).keySet().iterator(); iterator.hasNext();) {Thread hook = iterator.next();// 4. 用于延迟执行原本钩子的钩子不必再延迟,所以跳过if (hook.getName().startsWith(hookNamePrefix)) {continue;}// 5. 正在处理的延迟的钩子和处理过的延迟的钩子不必再延迟,所以跳过if (delayHooks.containsKey(hook) || delayedHooks.containsKey(hook)) {continue;}Thread delayHook = new Thread(() -> {LOG.info("sleep {}ms", shutdownWaitTime);try {// 6. 先睡眠 shutdownWaitTime,然后再执行原本的在应用关闭时的钩子TimeUnit.MILLISECONDS.sleep(shutdownWaitTime);} catch (InterruptedException ex) {LOG.error(ex.getMessage(), ex);}hook.run();}, hook.getName());delayHooks.put(delayHook, delayHook);// 7. 从原本的钩子 map 中移除这个原本要执行的钩子,即 delayHookiterator.remove();}for (Iterator<Thread> iterator = delayHooks.keySet().iterator(); iterator.hasNext();) {Thread delayHook = iterator.next();// 8. 向运行时加入用来延迟执行原本钩子的钩子,即 delayedHooksRuntime.getRuntime().addShutdownHook(delayHook);// 9. 加入已处理过的钩子 map,delayedHooks.put(delayHook, delayHook);iterator.remove();LOG.info("hook {} will sleep {}ms when it start", delayHook.getName(), shutdownWaitTime);}try {// 10. 睡眠 100ms,目的是?// GPT:// 答:1. 减少CPU使用率// 2. 给其他操作留出处理时间,通过在每次循环后短暂休眠,可以给其他线程运行的机会TimeUnit.MILLISECONDS.sleep(100);} catch (InterruptedException ex) {LOG.error(ex.getMessage(), ex);}}// 帮助 GChookNamePrefix = null;hookId = new AtomicInteger(0);props = null;delayHooks = null;delayedHooks = null;}}
}
-
TakeoverOtherHooksThread.run()
代码如上面给出的:
- 该线程用于生成钩子,这些钩子用来延迟执行已经添加的钩子,为的是处理一些资源的关闭,和注册信息的注销
- 通过反射拿到应用关闭时的所有钩子
- 限制处理钩子的时间在
delayOtherHooksExecTime
之内,为什么要控制时间,难道不会遗漏一些钩子无法延迟吗?
GPT:
答:
1. 避免死锁或长时间阻塞
2. 可以确保这个延迟逻辑不会过度拖延应用的关闭过程
3. 实用性考虑: 在大多数情况下,如果在给定的时间内无法连接到或修改某些钩子,可能是因为存在一些异常或特殊情况。 在这种情况下,继续等待可能不会带来太多好处,而是增加了关闭过程的复杂性和不确定性。确实,这种方法可能会遗漏一些在延迟期间新注册的钩子,但这通常是一个权衡的结果,设计者可能认为这种情况很少发生,或者遗漏的风险相对较小。 - 用于延迟执行原本钩子的钩子不必再延迟,所以跳过
- 正在处理的延迟的钩子和处理过的延迟的钩子不必再延迟,所以跳过
- 先睡眠 shutdownWaitTime,然后再执行原本的在应用关闭时的钩子
- 从原本的钩子 map 中移除这个原本要执行的钩子,即
delayHook
- 向运行时加入用来延迟执行原本钩子的钩子,即
delayedHooks
- 加入已处理过的钩子 map
- 睡眠 100ms,目的是?
GPT:
答:
1. 减少CPU使用率
2. 给其他操作留出处理时间,通过在每次循环后短暂休眠,可以给其他线程运行的机会
给 Admin 端发送 DTO 注册信息
-
ShenyuClientRegisterRepository.persistURI()
ShenyuClientRegisterRepository
、FailbackRegistryRepository
、HttpClientRegisterRepository
继承关系如下图 -
ShenyuClientRegisterRepository.persistURI()
/*** Shenyu client register repository.*/
@SPI
public interface ShenyuClientRegisterRepository {/*** Init.** @param config the config*/default void init(ShenyuRegisterCenterConfig config) {}/*** Persist metadata.** @param metadata metadata*/void persistInterface(MetaDataRegisterDTO metadata);/*** Persist uri.** @param registerDTO the register dto*/default void persistURI(URIRegisterDTO registerDTO) {}/*** Node active offline when shutdown.** @param offlineDTO the offline dto*/default void offline(URIRegisterDTO offlineDTO) {}/*** persistApiDoc.* @param apiDocRegisterDTO apiDocRegisterDTO*/default void persistApiDoc(ApiDocRegisterDTO apiDocRegisterDTO) {}/*** closeRepository.* If the close method is used, Spring will call it by default when the bean is destroyed,* So its method name is closeRepository to avoid being called by default when the bean is destroyed.*/default void closeRepository() {}
}
-
FailbackRegistryRepository.persistURI()
这里同样用到了模板方法,
doPersistURI
交由子类HttpClientRegisterRepository
实现
public abstract class FailbackRegistryRepository implements ShenyuClientRegisterRepository {// ... @Overridepublic void persistURI(final URIRegisterDTO registerDTO) {try {// 1. 同样是模板方法,交由子类 HttpClientRegisterRepository 实现this.doPersistURI(registerDTO);} catch (Exception ex) {//If a failure occurs, it needs to be added to the retry list.logger.warn("Failed to persistURI {}, cause:{}", registerDTO, ex.getMessage());this.addFailureUriDataRegister(registerDTO);}}
}
-
HttpClientRegisterRepository.doPersistURI()
- 如果端口已被其他进程监听,则直接返回,不需要再注册
- 否则注册
public class HttpClientRegisterRepository extends FailbackRegistryRepository {// ...private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientRegisterRepository.class);private static URIRegisterDTO uriRegisterDTO;private static ApiDocRegisterDTO apiDocRegisterDTO;private String username;private String password;private List<String> serverList;/*** server -> accessToken.*/private LoadingCache<String, String> accessToken;// ...@Overridepublic void doPersistURI(final URIRegisterDTO registerDTO) {if (RuntimeUtils.listenByOther(registerDTO.getPort())) {// 1. 如果端口已被其他进程监听,则直接返回,不需要再注册return;}// 2. 否则注册doRegister(registerDTO, Constants.URI_PATH, Constants.URI);uriRegisterDTO = registerDTO;}private <T> void doRegister(final T t, final String path, final String type) {int i = 0;for (String server : serverList) {i++;String concat = server.concat(path);try {String accessToken = this.accessToken.get(server);if (StringUtils.isBlank(accessToken)) {throw new NullPointerException("accessToken is null");}// 1. 调用注册工具类进行注册RegisterUtils.doRegister(GsonUtils.getInstance().toJson(t), concat, type, accessToken);// considering the situation of multiple clusters, we should continue to execute here} catch (Exception e) {LOGGER.error("Register admin url :{} is fail, will retry. cause:{}", server, e.getMessage());if (i == serverList.size()) {throw new RuntimeException(e);}}}}
}
-
HttpClientRegisterRepository.doRegister()
- 调用注册工具类进行注册(代码如上)
-
RegisterUtils.doRegister()
- 构建 http 的 heade
- 在此通过 http 调用 Admin 的服务进行注册,
url 为 Admin 端的注册用的接口,有 localhost:9095/shenyu-client/register-metadata 等url;
json 为要传输的注册信息 - OkHttpTools 是 shenyu 对 okhttp 外部组件的封装
public final class RegisterUtils {// ...public static void doRegister(final String json, final String url, final String type, final String accessToken) throws IOException {if (StringUtils.isBlank(accessToken)) {LOGGER.error("{} client register error accessToken is null, please check the config : {} ", type, json);return;}// 1. 构建 http 的 headerHeaders headers = new Headers.Builder().add(Constants.X_ACCESS_TOKEN, accessToken).build();// 2. 在此通过 http 调用 Admin 的服务进行注册,// url 为 Admin 端的注册用的接口,有 localhost:9095/shenyu-client/register-metadata 等url;// json 为要传输的注册信息// 3. OkHttpTools 是 shenyu 对 okhttp 外部组件的封装String result = OkHttpTools.getInstance().post(url, json, headers);if (Objects.equals(SUCCESS, result)) {LOGGER.info("{} client register success: {} ", type, json);} else {LOGGER.error("{} client register error: {} ", type, json);}}
}
向应用添加一个钩子,使得在应用关闭时,应用自动开启一个新线程去注销注册信息
-
ShutdownHookManager.addShutdownHook()
- 向运行时添加一个关机钩子,这个钩子是一个新线程,新线程去执行 ShutdownHookManager 管理的要在关机时执行的钩子
- 添加关闭应用时要执行的注销注册的钩子
public final class ShutdownHookManager {// ...private static final ShutdownHookManager MGR = new ShutdownHookManager();private final Set<HookEntry> hooks =Collections.synchronizedSet(new HashSet<HookEntry>()); static {// 1. 向运行时添加一个关机钩子,这个钩子是一个新线程,// 新线程去执行 ShutdownHookManager 管理的要在关机的钩子Runtime.getRuntime().addShutdownHook(new Thread(() -> {MGR.shutdownInProgress.set(true);for (Runnable hook : MGR.getShutdownHooksInOrder()) {try {hook.run();} catch (Throwable ex) {LOG.error(ex.getMessage(), ex);}}}));}// ...public void addShutdownHook(final Runnable shutdownHook, final int priority) {if (shutdownHook == null) {throw new IllegalArgumentException("shutdownHook cannot be NULL");}if (shutdownInProgress.get()) {throw new IllegalStateException("Shutdown in progress, cannot add a shutdownHook");}// 2. 添加关闭应用时要执行的注销注册的钩子hooks.add(new HookEntry(shutdownHook, priority));}
}