并发包
并发包的来历:
在实际开发中如果不需要考虑线程安全问题,大家不需要做线程安全,因为如果做了反而性能不好!
但是开发中有很多业务是需要考虑线程安全问题的,此时就必须考虑了。否则业务出现问题。
Java为很多业务场景提供了性能优异,且线程安全的并发包,程序员可以选择使用!Map集合中的经典集合:HashMap它是线程不安全的,性能好
- 如果在要求线程安全的业务情况下就不能用这个集合做Map集合,否则业务会崩溃~
- 为了保证线程安全,可以使用Hashtable。注意:线程中加入了计时
- Hashtable是线程安全的Map集合,但是性能较差!(已经被淘汰了,虽然安全,但是性能差)
- 为了保证线程安全,再看ConcurrentHashMap(不止线程安全,而且效率高,性能好,最新最好用的线程安全的Map集合)
- ConcurrentHashMap保证了线程安全,综合性能较好!
小结:
HashMap是线程不安全的。
Hashtable线程安全基于synchronized,综合性能差,被淘汰了。
ConcurrentHashMap:线程安全的,分段式锁,综合性能最好,线程安全开发中推荐使用
HashMap线程不安全验证及解决方案
public class ConcurrentHashMapDemo01 {//演示HashMap在高并发下的线程不安全性public static Map<String ,String> maps = new HashMap<>();public static void main(String[] args){Runnable target = new MyRunnable();Thread t1 = new Thread(target,"Thread-01");Thread t2 = new Thread(target,"Thread-02");t1.start();t2.start();try {t1.join(); //让两个线程跑完 使主线程不抢t1的CPUt2.join(); //只能t1,t2两个互相抢} catch (InterruptedException e) {throw new RuntimeException(e);}//等线程执行完毕获取集合最终元素个数System.out.println("元素个数:" + maps.size());}
}
class MyRunnable implements Runnable{@Overridepublic void run(){for(int i = 1;i <= 500000;i ++){ //第一个线程加了50万,第二个线程也加了50万ConcurrentHashMapDemo01.maps.put(Thread.currentThread().getName() + i,Thread.currentThread().getName() + i);}}
}
输出结果:元素个数:947823 不是1000000
public static Map<String ,String> maps = new Hashtable<>();
定义成这种线程就安全了,但性能差,源码全是加锁
public static Map<String ,String> maps = new ConcurrentHashMap<>();//线程安全,性能得到极大提升
CountDownLatch并发包
CountDownLatch允许一个或多个线程等待其他线程完成操作,再执行自己。
- 例如:线程1要执行打印:A和C,线程2要执行打印:B,但线程1在打印A后,要线程2打印B之后才能打印C,所以:线程1在打印A后,必须等待线程2打印完B之后才能继续执行
需求:
提供A线程,打印 A , C
提供B线程,打印 B构造器:
public CountDownLatch(int count)// 初始化唤醒需要的down几步。方法:
public void await() throws InterruptedException// 让当前线程等待,必须down完初始化的数字才可以被唤醒,否则进入无限等待
public void countDown() // 计数器进行减1 (down 1)小结:
CountDownLatch可以用于让某个线程等待几步才可以继续执行, 从而可以实现控制线程执行的流程!
创建了几个Down就要c.countDown();几次,否则会出错!!!
public class CountDownLatchDemo01 {public static void main(String[] args) {//创建countdownlatch对象用于监督A、B线程执行情况_监督CountDownLatch c = new CountDownLatch(1);new MyThread_A(c).start();new MyThread_B(c).start();}
}
class MyThread_A extends Thread{private CountDownLatch c;public MyThread_A(CountDownLatch c) {this.c = c;}@Overridepublic void run(){System.out.println("A");//等待自己,当前线程让出CPU等待自己try {c.await();} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("C");}
}
class MyThread_B extends Thread{private CountDownLatch c;public MyThread_B(CountDownLatch c) {this.c = c;}@Overridepublic void run(){System.out.println("B");c.countDown(); //让计数器减1,被等待的线程就唤醒了}
}
CyclicBarrier并发包
- CyclicBarrier作用:
某个线程任务必须等待其他线程执行完毕以后才能最终触发自己执行。- 例如:公司召集5名员工开会,等5名员工都到了,会议开始。
我们创建5个员工线程,1个开会任务,几乎同时启动
使用CyclicBarrier保证5名员工线程全部执行后,再执行开会线程。- 构造器:
public CyclicBarrier(int parties, Runnable barrierAction)
// 用于在线程到达屏障5时,优先执行barrierAction,方便处理更复杂的业务场景- 方法:
public int await()
// 每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞- 小结:
可以实现多线程中,某个任务在等待其他线程执行完毕以后触发。
循环屏障可以实现达到一组屏障就触发一个任务执行!
public class CyclicBarrierDemo01 {public static void main(String[] args) {//2.创建循环屏障对象,等到5个线程执行完毕后触发一次线程任务(开会)CyclicBarrier c = new CyclicBarrier(5,new Meeting() );//1.创建一个任务循环屏障对象for(int i = 1;i <= 10;i ++){try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}new EmployeeThread("Employee-" + i,c).start();}}
}
class Meeting implements Runnable{@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + "开始组织会议!");}
}
class EmployeeThread extends Thread{private CyclicBarrier c;public EmployeeThread(String name,CyclicBarrier c){super(name);this.c = c;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + "正在进入会议室……");try {c.await();//每个线程调用await方法,告诉c我已经到达了屏障,然后当前线程被回收} catch (InterruptedException | BrokenBarrierException e) {throw new RuntimeException(e);}}
}
达到一组屏障就触发一个任务执行!达到两组屏障就触发两个任务执行!
Semaphore并发包(*)
引入:
- Semaphore(发信号)的主要作用是控制线程的并发数量。
- synchronized可以起到"锁"的作用,但某个时间段内,只能有一个线程允许执行。
Semaphore可以设置同时允许几个线程执行。- Semaphore字面意思是信号量的意思,它的作用是控制访问特定资源的线程数目。
Semaphore的构造器:
- public Semaphore(int permits): permits 表示许可线程的数量
- public Semaphore(int permits, boolean fair):fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程
- Semaphore的方法:
public void acquire() throws InterruptedException 表示获取许可
public void release() release() 表示释放许可小结:
Semaphore可以控制并发线程同时进行的数量。
public class SemaphoreDemo01 {public static void main(String[] args) {Service service = new Service(); //一个业务,多个线程调用for(int i = 1;i <= 5;i ++) //5个线程{Thread a = new MyThread(service);a.start();}}
}
//线程类跑代码
class MyThread extends Thread{private Service service;public MyThread(Service service){this.service = service;}@Overridepublic void run() {service.login();}
}
//业务
class Service{//表示许可!最多允许1个线程执行acquire()和release()之间的内容private Semaphore semaphore = new Semaphore(1); //同时只允许1个进来 控制流量//登录功能public void login(){try {semaphore.acquire(); //上锁System.out.println(Thread.currentThread().getName()+ "进入时间:" + System.currentTimeMillis());try {Thread.sleep(1000);System.out.println(Thread.currentThread().getName() + "登录成功!");} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(Thread.currentThread().getName()+ "离开时间:" + System.currentTimeMillis());semaphore.release(); //开锁}catch (Exception e){e.printStackTrace();}}
}
Exchanger并发包
作用
- Exchanger(交换者)是一个用于线程间协作的工具类。
- Exchanger用于进行线程间的数据交换。
- 这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
Exchanger构造方法:
public Exchanger()
Exchanger重要方法:
public V exchange(V x)分析:
(1)需要2个线程
(2)需要一个交换对象负责交换两个线程执行的结果。小结:
Exchanger可以实现线程间的数据交换。
一个线程如果等不到对方的数据交换就会一直等待。
我们也可以控制一个线程等待的时间。
必须双方都进行交换才可以正常进行数据的交换。
public class ExchangerDemo01 {public static void main(String[] args) {Exchanger<String> exchanger = new Exchanger<>();new Boy(exchanger).start();new Girl(exchanger).start();}
}
class Boy extends Thread{private Exchanger<String> exchanger;public Boy(Exchanger<String> exchanger){this.exchanger = exchanger;}@Overridepublic void run() {//交换结果try {System.out.println("男孩开始制作定情信物:心锁");String rs = exchanger.exchange("心锁");System.out.println("男孩收到了:" + rs);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}class Girl extends Thread{private Exchanger<String> exchanger;public Girl(Exchanger<String> exchanger){this.exchanger = exchanger;}@Overridepublic void run() {System.out.println("女孩开始制作定情信物:心匙");//交换结果try {String rs = exchanger.exchange("心匙");System.out.println("女孩收到了" + rs);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}
时间延时判断
public class ExchangerDemo01 {public static void main(String[] args) {Exchanger<String> exchanger = new Exchanger<>();new Boy(exchanger).start();new Girl(exchanger).start();}
}
class Boy extends Thread{private Exchanger<String> exchanger;public Boy(Exchanger<String> exchanger){this.exchanger = exchanger;}@Overridepublic void run() {//交换结果try {System.out.println("男孩开始制作定情信物:心锁");//等待5秒,对方还不交换就抛出异常String rs = exchanger.exchange("心锁",5, TimeUnit.SECONDS);System.out.println("男孩收到了:" + rs);} catch (InterruptedException | TimeoutException e) {throw new RuntimeException(e);}}
}class Girl extends Thread{private Exchanger<String> exchanger;public Girl(Exchanger<String> exchanger){this.exchanger = exchanger;}@Overridepublic void run() {System.out.println("女孩开始制作定情信物:心匙");//交换结果try {Thread.sleep(6000); //犹豫6秒String rs = exchanger.exchange("心匙");System.out.println("女孩收到了" + rs);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}