1、并发编程
并发带来的问题:类中的属性值被不同的线程并发修改,而属性值的修改常常是在方法中去修改的,所以最终要解决方法被不同的线程并发执行。
并发编程的目标:同一个方法或者不同的方法,不能在不同的线程同时执行。
比如下面的示例中:
addIfNotExists 方法,不能被A线程和B线程同时执行。
removeIfExists 方法,不能被A线程和B线程同时执行。
addIfNotExists 和 removeIfExists 这两个方法不能被A线程和B线程同时执行。
synchronized 实现并发编程
private final Object lock = new Object();private List<ElementType> connections = new ArrayList<>();public void addIfNotExists(ElementType element) {synchronized (lock) {if (!connections.contains(element)) {connections.add(element);}}}public void removeIfExists(ElementType element) {synchronized (lock) {if (connections.contains(element)) {connections.remove(element);}}}
ReentrantLock 实现并发编程
synchronized是重量级锁,ReentrantLock通过 AQS 实现,当存在大量线程同时竞争锁时,ReentrantLock 通常比 synchronized 表现出更好的性能。
ReentrantLock 优点:
ReentrantLock可以设置为new ReentrantLock(true)公平锁,可以保证等待时间最长的线程最先获取锁。synchronized是非公平锁。
ReentrantLock可以设置多个条件 lock.newCondition() 实现更复杂的线程同步模式。
ReentrantLock可以实现锁中断。
private final Lock lock = new ReentrantLock();private List<ElementType> connections = new ArrayList<>();public void addIfNotExists(ElementType element) {lock.lock();try {if (!connections.contains(element)) {connections.add(element);}} finally {lock.unlock();}}public void removeIfExists(ElementType element) {lock.lock();try {if (connections.contains(element)) {connections.remove(element);}} finally {lock.unlock();}}
ReentrantLock 唤醒
使用Condition时,引用的Condition对象必须从Lock实例的newCondition()返回,这样才能获得一个绑定了Lock实例的Condition实例。
Condition提供的await()、signal()、signalAll()和synchronized锁对象的wait()、notify()、notifyAll()行为是一样的:
- await()会释放当前锁,进入等待状态;
- signal()会唤醒某个等待线程;
- signalAll()会唤醒所有等待线程;
- 唤醒线程从await()返回后需要重新获得锁。
private final static Lock lock = new ReentrantLock();private final static Condition condition = lock.newCondition();private final static List<ElementType> connections = new ArrayList<>();public static ElementType get() {lock.lock();try {// 不为空if (!connections.isEmpty()) {return connections.remove(connections.size() - 1);}// 为空,等待,最长等待60秒if (condition.await(60, TimeUnit.SECONDS)) {if (!connections.isEmpty()) {return connections.remove(connections.size() - 1);}}} catch (InterruptedException e) {// 恢复中断状态Thread.currentThread().interrupt();} finally {lock.unlock();}return null;}public static void addIfNotExists(ElementType element) {lock.lock();try {if (!connections.contains(element)) {connections.add(element);// signal会唤醒其中一个await线程condition.signal();}} finally {lock.unlock();}}
2、实现连接池功能
package com.study.pool;import java.sql.Connection;
import java.sql.DriverManager;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;/*** 连接池*/
public class ConnectionPool {private static final int MAX_POOL_SIZE = 20; // 最大连接数private static int totalConnections = 0; // 总连接数private static final List<Connection> connections = new ArrayList<>(); // 空闲连接private static final Map<Connection, Long> connectionsMap = new HashMap<>(); // 空闲连接存活时间private static final Lock lock = new ReentrantLock();private static final Condition notice = lock.newCondition();/*** 加载驱动*/static {try {Class.forName("oracle.jdbc.driver.OracleDriver");} catch (ClassNotFoundException e) {e.printStackTrace();}}/*** 调用示例*/public static void main(String[] args) throws Exception {// 调用示例Connection connection = null;try {connection = getConnection();// 业务操作} catch (Exception e) {e.printStackTrace();} finally {releaseConnection(connection);}}/*** 定时器定时清理,超过最大存活时间的连接*/static {// 每隔1分钟检测1次,连接空闲5分钟删除ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);Runnable task = () -> {// 尝试获取锁,如果未获取到锁就不执行if (lock.tryLock()) {try {Iterator<Map.Entry<Connection, Long>> iterator = connectionsMap.entrySet().iterator();while (iterator.hasNext()) {Map.Entry<Connection, Long> entry = iterator.next();Connection connection = entry.getKey();Long value = entry.getValue();// 连接空闲时间超过5分钟if (System.currentTimeMillis() > value + 1000 * 60 * 5) {// 删除连接connections.remove(connection);iterator.remove();totalConnections--;connection.close();}}} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}};// 初始延迟0毫秒后,每隔1分钟执行一次任务executorService.scheduleAtFixedRate(task, 0, 1, TimeUnit.MINUTES);}/*** 从连接池获取连接*/public static Connection getConnection() {lock.lock();try {// 尝试从连接池中获取连接if (!connections.isEmpty()) {return connections.remove(connections.size() - 1);}// 连接池为空,创建新连接if (totalConnections < MAX_POOL_SIZE) {// 创建新连接Connection connection = DriverManager.getConnection("jdbc:oracle:thin:@192.168.111.201:1521:orcl", "root", "root");totalConnections++;return connection;}// 连接已到达最大连接数,等待归还连接,最多等待30秒,await()会释放当前线程的锁if (notice.await(30, TimeUnit.SECONDS)) {if (!connections.isEmpty()) {return connections.remove(connections.size() - 1);}}} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}return null;}/*** 将连接归还到连接池*/public static void releaseConnection(Connection connection) {lock.lock();try {if (connection != null && !connections.contains(connection)) {// 将连接归还到连接池connections.add(connection);connectionsMap.put(connection, System.currentTimeMillis());// 通知有空闲连接,signal()会唤醒其中一个await()线程notice.signal();}} finally {lock.unlock();}}/*** 删除连接*/public static void removeConnection(Connection connection) {lock.lock();try {if (connection != null && connections.contains(connection)) {// 删除连接connections.remove(connection);connectionsMap.remove(connection);totalConnections--;connection.close();}} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}}
1、lock.lock()和lock.unlock()代码块,保证所有方法都不能在不同的线程中同时执行,保证connections、connectionsMap、totalConnections等变量的线程安全。
lock.lock();try {// 业务} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}
2、notice.await()方法会阻塞,并释放当前线程的锁,被signal()或者signalAll()唤醒后,并抢到锁后不再阻塞继续往下执行。
也就是说 await() 会先类似 lock.unlock() 释放当前线程的锁,阻塞等待结束继续执行的时候,会再次类似 lock.lock() 给当前线程加锁(这说法不正确只是方便理解,实际是抢到锁后才继续执行)
notice.await(30, TimeUnit.SECONDS) 这里 await 等待30秒后,不再等待返回false,但是能否往下执行,还要看是否能抢到锁,
通俗地说 notice.await(30, TimeUnit.SECONDS) 这行代码最大阻塞时间不是30秒,这行代码阻塞几分钟也是可能的,如果一直未抢到锁,会一直阻塞直到抢到锁。