拥有梦想,即拥有了生命的火种。
梦想是一座高山,攀爬起来虽然艰辛,但一旦到达顶峰,你的努力就将被铭记于人心。
梦想是一个拼图,每一次努力都是一块拼图,最终汇成一个完整的梦想。
梦想是你的信念,即使远离了人群,它仍然激励你前进。
只有持续不断地前进,才能迎接你成就梦想的那一刻。
目录
上一篇博客习题讲解
实现自己的ThreadPoolExecutor,并测试其行为
自定义线程池
测试不同场景下的线程池行为
修改TaskQueue的行为,使其在某些条件下拒绝接受新任务
使用AsyncContext创建一个异步Servlet应用
探索Tomcat源码中关于NIO的具体实现细节
第10章 并发编程应用
10.1 JVM与多线程
10.2 Servlet与多线程
10.3 懒汉与饿汉模式
10.4 数据库Connection 与多线程
10.4.1 ThreadLocal与线程私有数据
10.4.2 ThreadLocal 存储数据库 Connection
10.4.3 ThreadLocal 实现Connection per logic 模式
10.4.4 ThreadLocal 实现Connection per request 模式
10.5 高并发网站的PageView统计
10.6 生成唯一的订单号
10.7 浏览器并发请求限制
10.8 NIO与多路复用
10.9 远程异步访问
10.10 防止缓存雪崩的DCL机制
10.11 分布式锁解决商品超卖
上一篇博客习题讲解
Java多线程与线程池技术详解(九)
Java多线程与线程池技术详解(九)-CSDN博客文章浏览阅读1.1k次,点赞29次,收藏13次。Tomcat的线程池是基于Java的实现的,但为了适应Web服务器的需求,它做了许多定制化处理。在创建自定义的时,可以指定核心线程数(corePoolSize)、最大线程数(maximumPoolSize)、空闲线程存活时间(keepAliveTime)、任务队列(workQueue)等参数。Tomcat中的与标准JDK版本不同,它增加了对提交任务计数的支持,并且在执行任务失败时会尝试将任务重新加入到任务队列中。// 自定义ThreadPoolExecutor构造函数// 预热所有核心线程。https://blog.csdn.net/speaking_me/article/details/144418617?spm=1001.2014.3001.5502
实现自己的ThreadPoolExecutor,并测试其行为
为了实现一个自定义的ThreadPoolExecutor
,我们将创建一个继承自ThreadPoolExecutor
的类,并重写一些方法来添加暂停/恢复机制。此外,我们还将探索不同的拒绝策略,如AbortPolicy
、CallerRunsPolicy
等,并根据应用场景选择最合适的策略。
自定义线程池
首先,我们需要构建一个自定义线程池,这包括设置核心线程数(corePoolSize
)、最大线程数(maximumPoolSize
)、保持活动时间(keepAliveTime
)、工作队列(workQueue
)和拒绝策略(handler
)。这里我们以PausableThreadPoolExecutor
为例,它允许我们在运行时暂停和恢复线程池中的任务执行。
import java.util.concurrent.*;public class PausableThreadPoolExecutor extends ThreadPoolExecutor {private final ReentrantLock lock = new ReentrantLock();private final Condition condition = lock.newCondition();private volatile boolean isPaused = false;public PausableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);}@Overrideprotected void beforeExecute(Thread t, Runnable r) {super.beforeExecute(t, r);lock.lock();try {while (isPaused) {try {condition.await(); // Wait until the thread pool is resumed.} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RuntimeException("Thread was interrupted", e);}}} finally {lock.unlock();}}public void pause() {lock.lock();try {isPaused = true;} finally {lock.unlock();}}public void resume() {lock.lock();try {isPaused = false;condition.signalAll(); // Notify all waiting threads to proceed.} finally {lock.unlock();}} }
测试不同场景下的线程池行为
接下来,我们要编写单元测试用例来验证线程池的各种特性是否按预期工作。例如,我们可以测试高负载情况下的表现,以及当线程池满时的任务处理方式。通过这些测试,可以确保我们的自定义线程池在各种条件下都能稳定运行。
import org.junit.Test; import static org.junit.Assert.*;public class PausableThreadPoolExecutorTest {@Testpublic void testPauseAndResume() throws InterruptedException {PausableThreadPoolExecutor executor = new PausableThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());// Submit tasks that will be paused and resumed.for (int i = 0; i < 5; i++) {final int taskId = i;executor.submit(() -> {System.out.println("Task " + taskId + " started.");try {Thread.sleep(1000); // Simulate work.} catch (InterruptedException e) {Thread.currentThread().interrupt();}System.out.println("Task " + taskId + " finished.");});}// Pause the executor.executor.pause();Thread.sleep(500); // Allow time for tasks to potentially start.// Ensure no more tasks are running after pausing.assertTrue(executor.getQueue().size() > 0);// Resume the executor.executor.resume();Thread.sleep(2000); // Allow enough time for all tasks to complete.// Check if all tasks have been processed.assertEquals(0, executor.getQueue().size());} }
修改TaskQueue的行为,使其在某些条件下拒绝接受新任务
对于修改任务队列的行为,使之能够在特定条件下拒绝接收新任务,可以通过自定义阻塞队列或者直接修改线程池的提交逻辑来实现。下面的例子展示了如何基于系统负载决定是否接受新任务:
import java.util.concurrent.*;public class LoadSensitiveBlockingQueue<T> extends LinkedBlockingQueue<T> {private final int maxLoad;private AtomicInteger currentLoad = new AtomicInteger(0);public LoadSensitiveBlockingQueue(int capacity, int maxLoad) {super(capacity);this.maxLoad = maxLoad;}@Overridepublic boolean offer(T e) {if (currentLoad.get() >= maxLoad) {System.out.println("The system is under heavy load, rejecting new tasks.");return false; // Reject new tasks when the system is under heavy load.} else {currentLoad.incrementAndGet();return super.offer(e);}}@Overridepublic T poll(long timeout, TimeUnit unit) throws InterruptedException {T task = super.poll(timeout, unit);if (task != null) {currentLoad.decrementAndGet();}return task;} }
使用AsyncContext创建一个异步Servlet应用
创建一个使用AsyncContext
API 的异步Servlet应用可以帮助提高服务器资源利用率和服务响应速度。即使是在处理长时间运行的操作时,也不会阻塞主线程,从而改善用户体验。
下面是一个简单的例子:
@WebServlet(urlPatterns = "/asyncServlet", asyncSupported = true) public class AsyncServletExample extends HttpServlet {protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {final AsyncContext asyncContext = request.startAsync();asyncContext.setTimeout(10 * 60 * 1000); // Set timeout to 10 minutes.asyncContext.start(() -> {try {// Simulate a long-running operation.Thread.sleep(5000); // Sleep for 5 seconds.PrintWriter out = asyncContext.getResponse().getWriter();out.write("This message was generated asynchronously.");out.flush();asyncContext.complete();} catch (InterruptedException | IOException e) {e.printStackTrace();}});} }
探索Tomcat源码中关于NIO的具体实现细节
最后,深入研究Tomcat源码中与NIO相关的部分,特别是从6.x版本开始支持的非阻塞I/O模型。了解Tomcat是如何利用Java NIO特性(如Selector
、Channel
、Buffer
)来管理大量并发连接的,可以帮助我们更好地理解现代Web服务器的工作原理。要开始这个过程,可以从下载或克隆Tomcat项目的GitHub仓库做起,然后关注配置文件server.xml
中的Connector节点,尤其是那些指定了协议为org.apache.coyote.http11.Http11NioProtocol
的地方。接着,查看NioEndpoint
组件的工作流程,包括Acceptor
、Poller
和SocketProcessor
等角色的作用。注意Tomcat是如何处理连接限制、事件轮询及任务分配的,这些都是保证高效并发处理的基础。
第10章 并发编程应用
10.1 JVM与多线程
Java虚拟机(JVM)为Java应用程序提供了运行时环境,它不仅负责解释字节码、管理内存,还支持多线程编程。在Java中,每个线程都是操作系统级别的实体,并且每个Java线程都会映射到一个操作系统的原生线程。因此,线程的创建、销毁和调度都由操作系统负责。JVM通过其内置的线程管理机制支持多线程编程,使得编写多线程程序变得相对容易,因为JVM提供了丰富的工具来帮助开发者处理并发问题。
代码示例:
// 创建一个新的线程并启动它。 Thread thread = new Thread(() -> {// 线程执行体System.out.println("Thread is running."); }); thread.start(); // 启动线程
10.2 Servlet与多线程
Servlet容器默认采用单实例多线程的方式来处理请求。当Web服务器启动的时候(或客户端发送请求到服务器时),Servlet就被加载并实例化(只存在一个Servlet实例);当请求到达时,Servlet容器通过调度线程池中等待执行的线程给请求者;线程执行Servlet的service()
方法;请求结束,放回线程池,等待被调用。这种模式提高了请求的响应时间,但同时也要求开发者特别注意线程安全问题,例如避免使用实例变量(成员变量),因为如果存在成员变量,可能发生多线程同时访问该资源时,导致数据的不一致。
代码示例:
@WebServlet("/example") public class ExampleServlet extends HttpServlet {private static final long serialVersionUID = 1L;@Overrideprotected void doGet(HttpServletRequest request, HttpServletResponse response)throws ServletException, IOException {// 处理GET请求response.getWriter().println("Handling GET request.");}@Overrideprotected void doPost(HttpServletRequest request, HttpServletResponse response)throws ServletException, IOException {// 处理POST请求response.getWriter().println("Handling POST request.");} }
10.3 懒汉与饿汉模式
懒汉模式指的是在第一次调用getInstance()
方法时才创建单例对象,而饿汉模式则是在类加载时就创建了单例对象。懒汉模式可以延迟加载,节省内存资源,但在多线程环境下需要考虑线程安全问题,通常会使用双重检查锁定(Double-Checked Locking, DCL)模式来确保线程安全。而饿汉模式简单直接,但由于实例在类加载时就已经创建,因此不能实现延迟加载。
代码示例:
懒汉模式:
public class LazySingleton {private static volatile LazySingleton instance = null;private LazySingleton() {}public static LazySingleton getInstance() {if (instance == null) { // 第一次检查synchronized (LazySingleton.class) {if (instance == null) { // 第二次检查instance = new LazySingleton();}}}return instance;} }
饿汉模式:
public class HungrySingleton {private static final HungrySingleton INSTANCE = new HungrySingleton();private HungrySingleton() {}public static HungrySingleton getInstance() {return INSTANCE;} }
10.4 数据库Connection 与多线程
10.4.1 ThreadLocal与线程私有数据
ThreadLocal
为每个线程提供了一个独立的变量副本,使得每个线程都可以独立地访问自己的副本而不受其他线程的影响。这为解决多线程环境下的共享资源问题提供了一种有效的解决方案。
10.4.2 ThreadLocal 存储数据库 Connection
为了确保每个线程都有自己独立的数据库连接,可以通过ThreadLocal
来存储每个线程的Connection
对象。这样即使在同一应用程序的不同线程之间也可以保持各自的数据库会话隔离,从而避免了多线程共享同一个Connection
所带来的潜在问题。
代码示例:
public class DatabaseHelper {private static final ThreadLocal<Connection> connectionHolder = new ThreadLocal<>();public static Connection getConnection() throws SQLException {Connection conn = connectionHolder.get();if (conn == null) {conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "user", "password");connectionHolder.set(conn);}return conn;}public static void closeConnection() {Connection conn = connectionHolder.get();if (conn != null) {try {conn.close();} catch (SQLException e) {// handle exception} finally {connectionHolder.remove();}}} }
10.4.3 ThreadLocal 实现Connection per logic 模式
在这种模式下,对于每个业务逻辑单元(如事务边界内的所有操作),都会分配一个新的Connection
,并通过ThreadLocal
保证该Connection
仅限于当前逻辑单元内使用。一旦逻辑单元完成,即关闭此Connection
,以释放资源。
10.4.4 ThreadLocal 实现Connection per request 模式
对于Web应用而言,每次HTTP请求都应该有自己的数据库连接。利用ThreadLocal
可以在每次请求开始时创建一个新的Connection
,并在请求结束时关闭它,确保每个请求都有独立的数据库会话。
代码示例:
@WebServlet("/dbOperation") public class DbOperationServlet extends HttpServlet {private static final ThreadLocal<Connection> connectionHolder = new ThreadLocal<>();@Overrideprotected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {Connection conn = null;try {conn = DatabaseHelper.getConnection();// 执行数据库操作...} catch (SQLException e) {// 处理异常...} finally {DatabaseHelper.closeConnection();}} }
10.5 高并发网站的PageView统计
在高并发场景下,准确统计页面浏览量(PageView)是一个挑战。一种常见的做法是引入消息队列或缓存层来缓冲实时计数,并定期将这些增量更新同步到持久化存储中。此外,还可以采用分布式锁或其他一致性协议来防止竞态条件的发生。
10.6 生成唯一的订单号
为了确保订单号的唯一性,可以结合时间戳、机器ID以及序列号来构造订单号。比如,可以使用UUID或者基于雪花算法(Snowflake Algorithm)生成全局唯一的ID。这种方法能够有效地避免重复订单号的问题。
代码示例:
public class OrderIdGenerator {private static final AtomicLong sequence = new AtomicLong(0);public static String generateOrderId() {return String.format("%s-%d", UUID.randomUUID().toString(), sequence.incrementAndGet());} }
10.7 浏览器并发请求限制
现代浏览器对同一域名下的并发请求数量有一定的限制,通常是6个左右。超出这个数量的请求会被排队等待直到有空闲的连接可用。开发者应该了解这一点,并优化前端代码以减少不必要的请求,提高用户体验。
10.8 NIO与多路复用
NIO(New I/O)提供了非阻塞I/O的支持,允许一个线程处理多个网络连接。通过选择器(Selector
)机制,可以监听多个通道(Channel
)上的事件,如读写就绪等,从而实现高效的并发处理。这种方式特别适合构建高性能的服务端应用。
10.9 远程异步访问
远程异步访问是指客户端发起请求后不需要立即等待响应,而是继续执行后续任务,待服务端完成处理后再通知客户端结果。这种方式可以显著提升系统的响应速度和服务能力。实现上可以通过回调函数、Future/Promise模式或者消息队列等方式达成。
10.10 防止缓存雪崩的DCL机制
缓存雪崩指的是大量缓存同时失效,导致瞬间大量的请求直接打到数据库上,造成系统压力骤增。为了避免这种情况,可以在缓存设置时加入随机的时间偏移量,分散缓存过期时间;同时,在获取缓存时采用双重检查锁定(Double-Checked Locking, DCL)模式,确保只有一个线程负责加载新数据。
10.11 分布式锁解决商品超卖
分布式环境中,多个节点可能同时尝试修改相同的资源,如库存数量,这就可能导致商品超卖等问题。使用分布式锁可以在多个服务实例间协调对共享资源的访问,确保同一时刻只有一个实例能够进行更新操作。常见的分布式锁实现包括Redis、Zookeeper等。
以上内容涵盖了从基础概念到高级话题的广泛领域,旨在帮助初学者全面理解并发编程的核心思想和技术细节。每个部分都包含了理论知识和实践指导。