我的最后两个博客讨论了长时间轮询和Spring的DeferredResult
技术,并且为了展示这些概念,我将我的Producer Consumer项目中的代码添加到了Web应用程序中。 尽管该代码演示了博客所提出的观点,但其逻辑上确实包含大量漏洞。 除了在实际的应用程序中不会使用简单的LinkedBlockingQueue
而是会选择JMS或其他一些具有工业实力的消息传递服务的事实,以及只有一个用户可以掌握匹配更新的事实之外,还有一个问题生成行为不佳的线程,这些线程在JVM终止时不会关闭。
您可能想知道为什么这应该是一个问题……对您而言,作为开发人员,这根本不是问题,这只是一点点草率的编程,但是对于您的一个操作人员而言,它可能会使生活变得不必要地困难。 这样做的原因是,如果您有太多行为异常的线程,那么键入Tomcat的shutdown.sh
命令将几乎没有效果,并且您必须通过键入以下命令来大胆地杀死Web服务器:
ps -ef | grep java
得到pid然后
kill -9 <<pid>>
…并且当您有一个Tomcat Web服务器字段来重新启动所有这些额外的问题时,这将变得非常痛苦。 当您键入shutdown.sh
您希望Tomcat停止。
在我的前两篇博客中,我创建的行为不良的线程具有以下run()
方法,其中第一个方法表现如下:
@Override public void run() { while (true) { try { DeferredResult<Message> result = resultQueue.take(); Message message = queue.take(); result.setResult(message); } catch (InterruptedException e) { throw new UpdateException("Cannot get latest update. " + e.getMessage(), e); } } }
在这段代码中,我使用了一个无限的while(true)
,这意味着线程将一直运行并且永不终止。
@Override public void run() { sleep(5); // Sleep to allow the reset of the app to load logger.info("The match has now started..."); long now = System.currentTimeMillis(); List<Message> matchUpdates = match.getUpdates(); for (Message message : matchUpdates) { delayUntilNextUpdate(now, message.getTime()); logger.info("Add message to queue: {}", message.getMessageText()); queue.add(message); } start = true; // Game over, can restart logger.warn("GAME OVER"); }
上面的第二个示例也表现不佳。 它将继续从MatchUpdates
列表中获取消息,并在适当的时候将其添加到消息队列中。 它们唯一的优点是,它们可能会引发InterruptedException
,如果处理不当,将导致线程终止。 但是,这不能保证。
确实,有一个快速修复程序……您要做的就是确保您创建的任何线程都是守护程序线程。 守护程序线程的定义是一个线程,它不会阻止JVM在程序完成时退出,但该线程仍在运行。 守护程序线程的通常示例是JVM的垃圾回收线程。 要将线程转换为守护程序线程,只需调用:
thread.setDaemon(true);
…然后当您键入shutdown.sh
然后输入WHAM时 ,所有线程将消失。 但是,这有一个问题。 如果您的守护程序线程中的一个正在做重要的事情,并在其主要时间内将其砍掉,会导致丢失一些非常重要的数据怎么办?
您需要做的是确保所有线程正常关闭,以完成它们当前正在进行的所有工作。 本博客的其余部分演示了针对这些错误线程的修复程序,通过使用ShutdownHook
优雅地协调了它们的ShutdownHook
。 根据文档 ,“ shutdown hook”只是一个已初始化但未启动的线程。 当虚拟机开始其关闭序列时,它将以某种未指定的顺序启动所有已注册的关闭挂钩,并使其同时运行。” 因此,在读完最后一句话之后,您可能已经猜到您需要做的是创建一个线程,该线程负责关闭所有其他线程,并作为关闭钩子传递给JVM。 所有这些都可以在几个小类中通用,并且可以通过对现有线程run()
方法执行一些棘手的操作来实现。
要创建的两个类是ShutdownService
和Hook
。 我将首先演示的Hook
类用于将ShutdownService
链接到您的线程。 Hook
的代码如下:
public class Hook { private static final Logger logger = LoggerFactory.getLogger(Hook.class); private boolean keepRunning = true; private final Thread thread; Hook(Thread thread) { this.thread = thread; } /** * @return True if the daemon thread is to keep running */ public boolean keepRunning() { return keepRunning; } /** * Tell the client daemon thread to shutdown and wait for it to close gracefully. */ public void shutdown() { keepRunning = false; thread.interrupt(); try { thread.join(); } catch (InterruptedException e) { logger.error("Error shutting down thread with hook", e); } }
}
Hook
包含两个实例变量: keepRunning
和thread
。 thread
是该线程的Hook
实例负责关闭的线程的引用,而keepRunning
告诉该线程…继续运行。
Hook
有两个公共方法: keepRunning()
和shutdown()
。 线程调用keepRunning()
以确定是否应继续运行,而ShutdownService
的shutdown钩子线程调用shutdown()
来使线程关闭。 这是两种方法中最有趣的。 首先,它将keepRunning
变量设置为false。 然后,它调用thread.interrupt()
来中断线程,迫使其引发InterruptedException
。 最后,它调用thread.join()
并等待thread
实例关闭。
请注意,此技术依赖于您所有线程的协作。 如果混合中有一个行为不佳的线程,那么整个事情可能会死机。 要解决此问题,请向thread.join(…)
添加超时。
@Service
public class ShutdownService { private static final Logger logger = LoggerFactory.getLogger(ShutdownService.class); private final List<Hook> hooks; public ShutdownService() { logger.debug("Creating shutdown service"); hooks = new ArrayList<Hook>(); createShutdownHook(); } /** * Protected for testing */ @VisibleForTesting protected void createShutdownHook() { ShutdownDaemonHook shutdownHook = new ShutdownDaemonHook(); Runtime.getRuntime().addShutdownHook(shutdownHook); } protected class ShutdownDaemonHook extends Thread { /** * Loop and shutdown all the daemon threads using the hooks * * @see java.lang.Thread#run() */ @Override public void run() { logger.info("Running shutdown sync"); for (Hook hook : hooks) { hook.shutdown(); } } } /** * Create a new instance of the hook class */ public Hook createHook(Thread thread) { thread.setDaemon(true); Hook retVal = new Hook(thread); hooks.add(retVal); return retVal; } @VisibleForTesting List<Hook> getHooks() { return hooks; }
}
ShutdownService
是一个Spring服务,其中包含一个Hook
类的列表,并因此通过推断线程负责关闭。 它还包含一个内部类ShutdownDaemonHook
,该类扩展了Thread
。 的一个实例ShutdownDaemonHook
的施工过程中创建ShutdownService
,然后通过调用传递给JVM作为关闭挂钩
Runtime.getRuntime().addShutdownHook(shutdownHook);
ShutdownService
具有一个公共方法: createHook()
。 该类要做的第一件事是确保传递给它的任何线程都转换为守护程序线程。 然后,它创建一个新的Hook
实例,将线程作为参数传入,最后将结果存储在列表中并将其返回给调用者。
现在剩下要做的唯一一件事就是将ShutdownService
集成到DeferredResultService
和MatchReporter
,这两个类包含行为不良的线程。
@Service("DeferredService")
public class DeferredResultService implements Runnable { private static final Logger logger = LoggerFactory.getLogger(DeferredResultService.class); private final BlockingQueue<DeferredResult<Message>> resultQueue = new LinkedBlockingQueue<>(); private Thread thread; private volatile boolean start = true; @Autowired private ShutdownService shutdownService; private Hook hook; @Autowired @Qualifier("theQueue") private LinkedBlockingQueue<Message> queue; @Autowired @Qualifier("BillSkyes") private MatchReporter matchReporter; public void subscribe() { logger.info("Starting server"); matchReporter.start(); startThread(); } private void startThread() { if (start) { synchronized (this) { if (start) { start = false; thread = new Thread(this, "Studio Teletype"); hook = shutdownService.createHook(thread); thread.start(); } } } } @Override public void run() { logger.info("DeferredResultService - Thread running"); while (hook.keepRunning()) { try { DeferredResult<Message> result = resultQueue.take(); Message message = queue.take(); result.setResult(message); } catch (InterruptedException e) { System.out.println("Interrupted when waiting for latest update. " + e.getMessage()); } } System.out.println("DeferredResultService - Thread ending"); } public void getUpdate(DeferredResult<Message> result) { resultQueue.add(result); } }
此类的第一个更改是在Shutdown
服务实例中自动连线。 接下来要做的是在创建线程之后但在thread.start()
之前,使用ShutdownService
创建Hook
的实例:
thread = new Thread(this, "Studio Teletype"); hook = shutdownService.createHook(thread); thread.start();
最后的更改是将while(true)
替换为:
while (hook.keepRunning()) {
…告诉线程何时退出while循环并关闭。
您可能还注意到,上面的代码中引发了一些System.out.println()
调用。 这是有原因的,这是因为执行关闭钩子线程的顺序不确定。 请记住,不仅您的类试图正常关闭,而且其他子系统也试图关闭。 这意味着我的原始代码logger.info(…)
失败, logger.info(…)
以下异常:
Exception in thread "Studio Teletype" java.lang.NoClassDefFoundError: org/apache/log4j/spi/ThrowableInformationat org.apache.log4j.spi.LoggingEvent.(LoggingEvent.java:159)at org.apache.log4j.Category.forcedLog(Category.java:391)at org.apache.log4j.Category.log(Category.java:856)at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:382)at com.captaindebug.longpoll.service.DeferredResultService.run(DeferredResultService.java:75)at java.lang.Thread.run(Thread.java:722)
Caused by: java.lang.ClassNotFoundException: org.apache.log4j.spi.ThrowableInformationat org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1714)at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1559)... 6 more
这是因为当我尝试调用记录器时,它已经被卸载。 同样,如文档所述:“ Shutdown hooks在虚拟机生命周期的微妙时间运行,因此应进行防御性编码。 尤其应将其编写为线程安全的,并尽可能避免死锁。 他们也不应盲目依赖可能已经注册了自己的关闭钩子的服务,因此自己可能正在关闭过程中。 尝试使用其他基于线程的服务,例如AWT事件调度线程,可能会导致死锁。”
MatchReport
类具有一些非常相似的修改。 主要区别在于hook.keepRunning()
代码位于run()
方法的for
循环内。
public class MatchReporter implements Runnable { private static final Logger logger = LoggerFactory.getLogger(MatchReporter.class); private final Match match; private final Queue<Message> queue; private volatile boolean start = true; @Autowired private ShutdownService shutdownService; private Hook hook; public MatchReporter(Match theBigMatch, Queue<Message> queue) { this.match = theBigMatch; this.queue = queue; } /** * Called by Spring after loading the context. Will "kick off" the match... */ public void start() { if (start) { synchronized (this) { if (start) { start = false; logger.info("Starting the Match Reporter..."); String name = match.getName(); Thread thread = new Thread(this, name); hook = shutdownService.createHook(thread); thread.start(); } } } else { logger.warn("Game already in progress"); } } /** * The main run loop */ @Override public void run() { sleep(5); // Sleep to allow the reset of the app to load logger.info("The match has now started..."); long now = System.currentTimeMillis(); List<Message> matchUpdates = match.getUpdates(); for (Message message : matchUpdates) { delayUntilNextUpdate(now, message.getTime()); if (!hook.keepRunning()) { break; } logger.info("Add message to queue: {}", message.getMessageText()); queue.add(message); } start = true; // Game over, can restart logger.warn("GAME OVER"); } private void sleep(int deplay) { try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { logger.info("Sleep interrupted..."); } } private void delayUntilNextUpdate(long now, long messageTime) { while (System.currentTimeMillis() < now + messageTime) { try { Thread.sleep(100); } catch (InterruptedException e) { logger.info("MatchReporter Thread interrupted..."); } } } }
此代码的最终测试是在匹配更新序列中途发出Tomcat shutdown.sh
命令。 JVM终止时,它将从ShutdownDaemonHook
类调用shutdown钩子。 在执行此类的run()
方法时,它将在整个Hook
实例列表中循环,告诉它们关闭各自的线程。 如果在服务器日志文件的末尾添加tail -f
(在我的案例中为catalina.out,但Tomcat可能配置为与我不同),则会看到条目痕迹,使服务器正常关闭。
该博客随附的代码可在Github上找到: https : //github.com/roghughe/captaindebug/tree/master/long-poll 。
翻译自: https://www.javacodegeeks.com/2013/10/tomcats-graceful-shutdown-with-daemons-and-shutdown-hooks.html