前言
前面几篇文章分别学习了多线程的基本知识和线程池使用,这篇则为项目实践和整理。
项目参考
选择了两个项目github地址,如果不方便下载可以下面留言评论私发。
1.马士兵老师的juc,讲述了多线程的基本知识线程讲解
2.基本的线程演示:主要是对前面几篇讲解的回顾。
代码展示
BlockingQueue(阻塞队列)
package com.unicss;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class MyBlockingQueue extends Thread {
public static BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);
private int index;
public MyBlockingQueue(int i) {this.index = i;
}
public void run() {try {queue.put(String.valueOf(this.index));System.out.println("{" + this.index + "} in queue!");} catch (Exception e) {e.printStackTrace();}
}
public static void main(String args[]) {ExecutorService service = Executors.newCachedThreadPool();for (int i = 0; i < 10; i++) {service.submit(new MyBlockingQueue(i));}Thread thread = new Thread() {public void run() {try {while (true) {Thread.sleep((int) (Math.random() * 1000));System.out.println("======="+MyBlockingQueue.queue.size());if(MyBlockingQueue.queue.isEmpty())break;String str = MyBlockingQueue.queue.take();System.out.println(str + " has take!");}} catch (Exception e) {e.printStackTrace();}}};service.submit(thread);service.shutdown();
}
}
CompletionService(并发工具类,获取接口放回)
package com.unicss;import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MyCompletionService implements Callable<String> {
private int id;public MyCompletionService(int i){this.id=i;
}
public static void main(String[] args) throws Exception{ExecutorService service=Executors.newCachedThreadPool();CompletionService<String> completion=new ExecutorCompletionService<String>(service);for(int i=0;i<10;i++){completion.submit(new MyCompletionService(i));}for(int i=0;i<10;i++){System.out.println(completion.take().get());}service.shutdown();
}@Override
public String call() throws Exception {Integer time=(int)(Math.random()*1000);try{System.out.println(this.id+" start");Thread.sleep(time);System.out.println(this.id+" end");}catch(Exception e){e.printStackTrace();}return this.id+":"+time;}
}
Executor
package com.unicss;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MyExecutor extends Thread {
private int index;
public MyExecutor(int i){this.index=i;
}
public void run(){try{System.out.println("["+this.index+"] start....");Thread.sleep((int)(Math.random()*10000));System.out.println("["+this.index+"] end.");}catch(Exception e){e.printStackTrace();}
}
public static void main(String args[]){ExecutorService service=Executors.newFixedThreadPool(4);for(int i=0;i<10;i++){service.execute(new MyExecutor(i));//service.submit(new MyExecutor(i));}System.out.println("submit finish");service.shutdown();
}
}
ReentrantLock(可重入到互斥锁)
package com.unicss;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
public class MyReentrantLock extends Thread{
TestReentrantLock lock;
private int id;
public MyReentrantLock(int i,TestReentrantLock test){this.id=i;this.lock=test;
}
public void run(){lock.print(id);
}
public static void main(String args[]){ExecutorService service=Executors.newCachedThreadPool();TestReentrantLock lock=new TestReentrantLock();for(int i=0;i<10;i++){service.submit(new MyReentrantLock(i,lock));}service.shutdown();
}
}
class TestReentrantLock{
private ReentrantLock lock=new ReentrantLock();
public void print(int str){try{lock.lock();System.out.println(str+"获得");Thread.sleep((int)(Math.random()*1000));}catch(Exception e){e.printStackTrace();}finally{System.out.println(str+"释放");lock.unlock();}
}
}
Semaphore(信号量)
package com.unicss;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class MySemaphore extends Thread {
Semaphore position;
private int id;
public MySemaphore(int i,Semaphore s){this.id=i;this.position=s;
}
public void run(){try{if(position.availablePermits()>0){System.out.println("顾客["+this.id+"]进入厕所,有空位");}else{System.out.println("顾客["+this.id+"]进入厕所,没空位,排队");}position.acquire();System.out.println("顾客["+this.id+"]获得坑位");Thread.sleep((int)(Math.random()*1000));System.out.println("顾客["+this.id+"]使用完毕");position.release();}catch(Exception e){e.printStackTrace();}
}
public static void main(String args[]){ExecutorService list=Executors.newCachedThreadPool();Semaphore position=new Semaphore(2);for(int i=0;i<10;i++){list.submit(new MySemaphore(i+1,position));}list.shutdown();position.acquireUninterruptibly(2);System.out.println("使用完毕,需要清扫了");position.release(2);
}
}
CountDownLatch(倒计数器)
package com.unicss;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestCountDownLatch {
public static void main(String[] args) throws InterruptedException {// 开始的倒数锁final CountDownLatch begin = new CountDownLatch(1);// 结束的倒数锁final CountDownLatch end = new CountDownLatch(10);// 十名选手final ExecutorService exec = Executors.newFixedThreadPool(10);for (int index = 0; index < 10; index++) {final int NO = index + 1;Runnable run = new Runnable() {public void run() {try {begin.await();//一直阻塞Thread.sleep((long) (Math.random() * 10000));System.out.println("No." + NO + " arrived");} catch (InterruptedException e) {} finally {end.countDown();}}};exec.submit(run);}System.out.println("Game Start");begin.countDown(); //让技术器变为0执行end.await();//等待阻塞,计数器变为0 执行System.out.println("Game Over");exec.shutdown();
}
}
CyclicBarrier(同步屏障)
package com.unicss;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestCyclicBarrier {// 徒步需要的时间: Shenzhen, Guangzhou, Shaoguan, Changsha, Wuhanprivate static int[] timeWalk = { 5, 8, 15, 15, 10 };// 自驾游private static int[] timeSelf = { 1, 3, 4, 4, 5 };// 旅游大巴private static int[] timeBus = { 2, 4, 6, 6, 7 };static String now() {SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");return sdf.format(new Date()) + ": ";}static class Tour implements Runnable {private int[] times;private CyclicBarrier barrier;private String tourName;public Tour(CyclicBarrier barrier, String tourName, int[] times) {this.times = times;this.tourName = tourName;this.barrier = barrier;}public void run() {try {Thread.sleep(times[0] * 1000);System.out.println(now() + tourName + " Reached Shenzhen");barrier.await();Thread.sleep(times[1] * 1000);System.out.println(now() + tourName + " Reached Guangzhou");barrier.await();Thread.sleep(times[2] * 1000);System.out.println(now() + tourName + " Reached Shaoguan");barrier.await();Thread.sleep(times[3] * 1000);System.out.println(now() + tourName + " Reached Changsha");barrier.await();Thread.sleep(times[4] * 1000);System.out.println(now() + tourName + " Reached Wuhan");barrier.await();} catch (InterruptedException e) {} catch (BrokenBarrierException e) {}}}public static void main(String[] args) {// 三个旅行团。 屏障为三个,必须三个到达屏障才会打开CyclicBarrier barrier = new CyclicBarrier(3);ExecutorService exec = Executors.newFixedThreadPool(3);exec.submit(new Tour(barrier, "WalkTour", timeWalk));exec.submit(new Tour(barrier, "SelfTour", timeSelf));
//当我们把下面的这段代码注释后,会发现,程序阻塞了,无法继续运行下去。exec.submit(new Tour(barrier, "BusTour", timeBus));// exec.submit(new Tour(barrier, "liu", timeBus)); 如果单独加一个会阻塞在屏障上,必须根据设置是三个exec.shutdown();}
}
ScheduledThread(定时线程)
package com.unicss;/*** 定时器*/
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
public class TestScheduledThread {
public static void main(String[] args) {final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);final Runnable beeper = new Runnable() {int count = 0;public void run() {System.out.println(new Date() + " beep " + (++count));}
};// 1秒钟后运行,并每隔2秒运行一次final ScheduledFuture beeperHandle = scheduler.scheduleAtFixedRate(beeper, 1, 2, SECONDS);// 2秒钟后运行,并每次在上次任务运行完后等待5秒后重新运行final ScheduledFuture beeperHandle2 = scheduler.scheduleWithFixedDelay(beeper, 2, 5, SECONDS);// 30秒后结束关闭任务,并且关闭Schedulerscheduler.schedule(new Runnable() {public void run() {beeperHandle.cancel(true);beeperHandle2.cancel(true);scheduler.shutdown();}}, 30, SECONDS);
}
}