14 多线程
操作系统的多任务(multitasking):在同一时刻运行多个程序的能力。
多线程在较低的层次上扩展了多任务的概念:一个程序同时执行多个任务。
通常,每一个任务称为一个线程(tread),它是线程控制的简称。
可以同时运行一个以上线程的程序称为多线程程序(multithreaded)。
多进程与多线程的区别:
线程拥有自己的一整套变量;线程只是共享数据。
共享变量使进程之间的通信比进程之间的通信更有效、更容易。
与进程相比较,线程更轻量级,创建、撤销一个线程比启动新进程的开销要小的多。
实际应用中,多线程非常有用。
例如,一个浏览器可以同时下载几幅图片、一个web服务器需要同时处理几个并发请求、GUI用一个独立的线程来收集用户界面事件。
多线程可能相当复杂。本章涵盖了应用程序可能需要的全部工具。
14.1 什么是线程
Thread类的静态sleep方法将暂停给定的毫秒数。
在一个单独的线程中执行一个任务的简单过程:
1、将任务代码移到实现了Runnable接口的类的run方法中;
public interface Runnable
{void run();
}
class MyRunnable implements Runnable
{public void run(){ task code }
}
2、创建一个类对象;
Runnable r = new MyRunnable();
3、由Runnable创建一个Thread对象;
Thread t = new Thread(r);
4、启动线程;
t.start();
14.2 中断线程
当线程的run方法执行方法体中的最后一条语句后,并经由执行return语句返回时,或者出现了在方法中没有捕获的异常时,线程将终止。
interrupt方法将中断状态置位。
每个线程都应该不时的检查这个标志,以判断线程是否被中断。
首先调用静态的Thread.currentThread方法获得当前线程,然后调用isInterrupted方法:
Thread.currentThread().isInterrupted()
如果线程被阻塞,就无法检测中断状态。
当在一个被阻塞的线程(调用sleep或wait)上调用interrupt方法时,阻塞调用将会被InterruptException异常中断。
存在不能被中断的阻塞I/O调用,应该考虑选择可中断的调用。
没有任何语言方面的需求要求一个被中断的线程应该终止。中断一个线程不过是引起它的注意。被中断的线程可以决定如何响应中断。
如果在中断状态被置位时调用sleep方法,它不会休眠。其清除这一状态并抛出InterruptedException。
静态的interrupted方法检测当前线程是否被中断,并且会清除中断状态。
isInterrupted方法是一个实例方法,来检测是否被中断,不改变中断状态。
void mySubTask()
{try{ sleep(delay); }catch (InterruptedException e) { } //DON’T IGNORE!
}
void mySubTask() throws InterruptedException
{sleep(delay);
}
void mySubTask()
{try{ sleep(delay); }catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
14.3 线程状态
6种:
New
Runnable
Blocked
Waiting
Timed waiting
Terminated
新创建进程:
new Thread(r);
可运行状态:
一旦调用start方法,线程处于runnable状态。
可运行的线程可能正在运行,也可能没有运行。
Java的规范说明没有将它作为一个单独状态。
一个正在运行中的线程仍然处于可运行状态。
运行中的线程被中断,目的是为了让其他线程获得运行机会。
抢占式调度系统给每一个可运行线程一个时间片来执行任务。
当时间片用完,操作系统剥夺该线程的运行权,并给另一个线程运行机会。
当选择下一个线程时,操作系统考虑线程的优先级。
现在所有的桌面以及服务器操作系统都使用抢占式调度。
像手机这样的小型设备可能使用协作式调度。在这样的设备中,一个线程只有在调用yield方法、或者被阻塞或等待时,线程才会失去控制权。
在具有多个处理器的机器上,每一个处理器运行一个线程,可以有多个线程并行运行。
如果线程的数目多于处理器的数目,调度器依然采用时间片机制。
在任何给定时刻,一个可运行的线程可能正在运行也可能没有运行。
被阻塞进程和等待进程:
当线程处于被阻塞或等待状态时,它暂时不活动。
被终止的线程:
·run方法正常退出;
·没有捕获的异常终止了run方法。
14.4 线程属性
线程优先级 守护线程线程组 处理未捕获异常的处理器
线程优先级:
默认情况下,一个线程继承它的父线程的优先级。
setPriority方法
1-10之间
MIN_PRIORITY 1
NORM_PRIORITY 5
MAX_PRIORITY 10
线程优先级是高度依赖于系统的。
当虚拟机依赖于宿主机平台的线程实现机制时,Java线程的优先级被映射到宿主机平台的优先级上,优先级个数也许更多,也许更少。
例如,Windows中有7个优先级别。一些Java优先级将映射到相同的操作系统优先级。
在Sun为Linux提供的Java虚拟机,线程的优先级被忽略——所有线程具有相同的优先级。
不要将程序功能的正确性依赖于优先级。
守护线程:
t.setDaemon(true);
这个方法必须在线程启动之前调用。
将线程转换为守护线程(daemon thread)。
守护线程的唯一用途:为其他线程服务。如计时线程。
当只剩下守护线程时,虚拟机就退出了。
未捕获异常处理器:
线程的run方法不能抛出任何被检测的异常,但是,不被检测的异常会导致线程终止。
在线程死亡之前,异常被传递到一个用于未捕获异常的处理器。
该处理器必须属于一个实现Thread.UncaughtExceptionHandler接口的类。
这个接口只有一个方法:void uncaughtException(Thread t, Throwable e)
可以用setUncaughtExceptionHandler方法为任何线程安装一个处理器。
也可以用Thread类的静态方法setDefaultUncaughtException为所有线程安装一个默认处理器。
替换处理器可以使用日志API发送未捕获的报告到日志文件。
如果不安装默认的处理器,默认的处理器为空。
但是,如果不为独立的线程安装处理器,此时的处理器就是该线程的ThreadGroup对象。
线程组是一个可以统一管理的线程集合。
默认情况下,创建的所有线程属于相同的线程组。
但是,也可能会建立其他的组。
建议不要在自己的程序中使用线程组。
ThreadGroup类实现了Thread.UncaughtExceptionHandler接口。
它的uncaughtException方法如下:
1、如果该线程组有父线程组,那么父线程组的uncaughtException方法被调用;
2、否则,如果Thread.getDefaultExceptionHandler方法返回一个非空的处理器,则调用该处理器;
3、否则,如果Throwable是ThrowDeath的一个实例,什么都不做;
4、否则,线程的名字以及Throwable的栈踪迹被输出到System.err。
14.5 同步
多个线程对同一数据存取。这样一个情况通常称为竞争条件(race condition)。
锁对象:
有两种机制防止代码块受并发访问的干扰。
synchronized关键字
ReentrantLock类
public class Bank
{private Lock bankLock = new ReentrantLock(); //ReentrantLock implements the Lock interfacepublic void transfer( int from, int to, int amount){bankLock.lock()try{accounts[from] -= amount;accounts[to] += amount;}finally{bankLock.unlock();}}
}
锁是可重入的,线程可以重复地获得已经持有的锁。
锁保持一个持有计数(hold count)来跟踪对lock方法的嵌套调用。
线程在每一次调用lock都要调用unlock来释放锁。
被一个锁保护的代码可以调用另一个使用相同的锁的方法,此时锁计数加1。
条件对象:
通常被称为条件变量(conditional variable)。
通常,线程进入临界区,却发现在满足一定条件之后才能执行。
要使用一个条件对象来管理那些已经获得了一个锁但是却不能做有用工作的线程。
一个锁对象可以有一个或多个相关的条件对象。
可以用newCondition方法获得一个条件对象。习惯上给条件对象命名为可以反映它所表达的条件的名字。
class Bank
{private Condition sufficientFunds;public Bank(){sufficientFunds = banLock.newCondition();}
}
如果不符合条件,调用sufficientFunds.await()方法。
当前线程现在被阻塞了,并放弃了锁。
一旦一个线程调用await方法,它进入该条件的等待集。
直到另一个线程调用同一条件上的signalAll方法时,唤醒等待集的进程。
sufficientFunds.signalAll();//这一调用重新激活因为这一条件而等待的所有线程。
从await调用返回,获得锁并从被阻塞的地方继续执行。
此时,线程应该再次测试该条件。
通常,对await的调用应该在如下形式的循环体中
while (!(ok to proceed))
condition.await();
至关重要的是最终需要某个其它线程调用signalAll方法。
另一个方法signal,随机解除等待集中某个线程的阻塞状态。
小结:
·锁用来保护代码片段,任何时刻只能有一个线程执行被保护的代码;
·锁可以管理试图进入被保护代码段的线程;
·锁可以拥有一个或多个相关的条件对象;
·每个条件对象管理那些已经进入被保护的代码段但还不能运行的线程。
synchronized关键字;
Lock和Condition接口为程序设计人员提供了高度的锁定控制。
然而,在大多数情况下,并不需要那样的控制,并且可以使用一种嵌入到Java语言内部的机制。
Java中的每一个对象都有一个内部锁。
如果一个方法用synchronized关键字声明,那么对象锁将保护整个方法。也就是说,要调用该方法,线程必须获得内部的对象锁。
public synchronized void method()
{method body
}
等价于
public void method()
{this.intrinsicLock.lock();try{method body}finally { this.intrinsicLock.unlock(); }
}
内部对象锁只有一个相关条件。
wait方法添加一个线程到等待集中,notifyAll/notify方法解除等待线程的阻塞状态。
换句话说,调用wait或notifyAll等价于
intrinsicCondition.await();
intrinsicCondition.signalAll();
wait、notifyAll以及notify方法是Object类中的final方法。
Condition方法必须被命名为await、signalAll和signal以便它们不会发生冲突。
class Bank
{private double[] accounts;public synchronized void transfer(int from, int to, int amount) throws InterruptedException{while(accounts[from] < amount)wait();accounts[from] -= amount;accounts[to] += amount;notifyAll();}
}
每一个对象有一个内部锁,并且该锁有一个内部条件。
由锁来管理那些试图进入synchronized方法的线程,由条件来管理那些调用wait的线程。
将静态方法声明为synchronized也是合法的。如果调用这种方法,该方法获得相关类对象的内部锁。
如果Bank类有一个静态同步方法,那么当该方法被调用时,Bank.class对象的锁被锁住。没有其他线程可以调用同一个类的这个或其他的同步静态方法。
内部锁和条件的局限:
·不能中断一个正在试图获得锁的线程;
·试图获得锁时不能设定超时;
·每个锁仅有单一的条件,可能是不够的;
在代码中该使用哪种锁?Lock和Condition对象还是同步方法?
·最好既不使用Lock/Condition也不使用synchronized关键字。在很多情况下可以使用java.util.concurrent包中的一种机制,它会处理所有的加锁;
·如果synchronized关键字适合程序,那么尽量使用它,这样可以减少编写的代码数量,减少出错几率;
·如果特别需要Lock/Condition结构提供的独有特性时,才使用Lock/Condition。
同步阻塞:
线程可以通过调用同步方法获得锁。
还有另一种机制可以获得锁,通过进入一个同步阻塞。
当线程进入如下形式的阻塞:
synchronized (obj) //this is the syntax for a synchronized block
{critical section
}
于是它获得obj的锁。
public class Bank
{private double[] accounts;private Object lock = new Object();public void transfer( int from, into to, int amount){synchronized (lock) // an ad-hoc lock{accounts[from] -= amount;accounts[to] += amount;}}
}
有时程序员使用一个对象的锁来实现额外的原子操作,实际上称为客户端锁定(client-side locking)。
public void transfer(Vector<Double> accounts, int from, int to, int amount)
{synchronized (accounts){accounts.set(from, accounts.get(from) - amount);accounts.set(to, accounts.get(to) + amount);}
}
这个方法完全依赖于这样一个事实,Vector类对自己的所有可修改方法都使用内部锁。
监视器概念:
锁和条件是线程同步的强大工具,但是,严格地讲,它们不是面向对象的。
多年来,研究人员努力寻找一种方法,可以在不需要程序员考虑如何加锁的情况下,就可以保证多线程的安全性。
最成功的的解决方案之一是监视器(monitor)。
监视器具有如下特性:
·监视器是只包含私有域的类;
·每个监视器类的对象有一个相关的锁;
·使用该锁对所有的方法进行加锁;
·该锁可以有任意多个相关条件。
每一个条件变量管理一个独立的线程集。
Volatile域:
同步格言:如果向一个变量写入值,而这个变量接下来可能会被另一个线程读取,或者,从一个变量读值,而这个变量可能是之前被另一个线程写入的,此时必须使用同步。
volatile关键字为实例域的同步访问提供了一种免锁机制。
如果声明一个域为volatile,那么编译器和虚拟机就该知道该域是可能被另一个线程并发更新的。
原子性:
java.util.concurrent.atomic包中有很多类使用了高效的机器级指令(而不是使用锁)来保证其他操作的原子性。
如,AtomicInteger类提供了方法incrementAndGet和decrementAndGet,它们分别以原子的方式将一个整数自增或自减。可安全的使用AtomicInteger作为共享计数器而无需同步。
还有AtomicBoolean、AtomicLong、AtomicReference以及Boolean值、整数、long值和引用的原子数组。
应用程序员不应该使用这些类,仅供开发并发工具的系统程序员使用。
死锁:
deadlock
线程局部变量:
使用ThreadLocal辅助类可为各个线程提供各自的实例。
public static final ThreadLocal<SimpleDateFormat> dateFormat =
new ThreadLocal<SimpleDateFormat>()
{protected SimpleDateFormat initialValue(){return new SimpleDateFormat(“yyyy-MM-dd”);}
}
要访问具体的格式化方法,可以调用String dateStamp = dateFormat.get().format(new Date());
在一个线程中首次调用get时,会调用initialValue方法,此后,get方法会返回属于当前线程的那个实例。
在多线程中生成随机数也存在类似问题。
java.util.Random类是线程安全的。
但是如果多个线程需要等待一个共享的随机数生成器,会很低效。
可以使用ThreadLocal辅助类为各个线程提供一个单独的生成器。
不过JavaSE7还提供了一个便利类。只需做以下调用:
int random = ThreadLocalRandom.current().nextInt(upperBound);
ThreadLocalRandom.current() 调用会返回特定于当前线程的Random实例。
锁测试与超时:
线程在调用lock方法来获得另一个线程所持有的锁的时候,很可能发生阻塞。
应该更加谨慎的申请锁。
tryLock方法试图申请一个锁,在成功获得锁后返回true,否则,立即返回false,而且线程可以立即离开去做其他事情。
if (myLock.tryLock())//now the thread owns the locktry { ... }finally { myLock.unlock()}
else//do something else
可以调用tryLock时,使用超时参数:if (myLock.tryLock(100, TimeUnit.MILLISECONDS)) ...
TimeUnit是一个枚举类型,取值包括SECONDS、MILLISECONDS、MICROSECONDS、NANOSECONDS。
lock方法不能被中断。如果一个线程在等待获得一个锁时被中断,中断线程在获得锁之前一直处于阻塞状态。如果出现死锁,那么,lock方法就无法终止。
然而,如果调用带有超时参数的tryLock,那么如果线程在等待期间被中断,将抛出InterruptedException。这是一个非常有用的特性,因为允许程序打破死锁。
也可调用lockInterruptibly方法。相当于一个超时设为无限的tryLock方法。
在等待一个条件时,也可以提供一个超时:
myCondition.await(100, TimeUnit.MILLISECONDS)
如果一个线程被另一个线程通过调用signalAll或signal激活,或者超时时限已达到,或者线程被中断,那么await方法将返回。
如果等待的线程被中断,await方法将抛出一个InterruptedException。
当希望线程中断后继续等待,可使用awaitUninterruptibly方法代替await。
读写锁:
java.util.concurrent.locks包定义了两个锁类:
ReentrantLock类和ReentrantReadWriteLock类
很多线程从一个数据结构读取数据,很少线程修改其中的数据用ReentrantReadWriteLock。
写线程互斥访问。
1、构造一个ReentrantReadWriteLock对象;
private ReentrantReadWriteLocck rwl = new ReentrantReadWriteLock();
2、抽取读锁和写锁;
private Lock readLock = rwl.readLock();
private Lock writeLock = rwl.writeLock();
3、对所有的获取方法加读锁;
public double getTotalBalance()
{readLock.lock()try{ ... }finally { readLock.lock(); }
}
4、对所有的修改方法加写锁;
public void transfer( ... )
{writeLock.lock();try { ... }finally { writeLock.unlock(); }
}
14.6 阻塞队列
前面介绍了Java并发程序设计的底层构建块。
对于实际编程来说,应该尽可能远离底层结构。使用由并发处理的专业人士实现的较高层次的结构要方便的多、安全的多。
不需要显式的线程同步,使用队列作为一种同步机制。
对于许多线程问题,可以通过使用一个或多个队列以优雅且安全的方式将其形式化。
生产线者程向队列插入元素,消费者线程则取出它们。
使用队列,可以安全的从一个线程向另一个线程传递数据。
当试图向满队列添加元素,或从空队列移出元素时,阻塞队列(blocking queue)导致线程阻塞。
在协调多个线程之间的合作时,阻塞队列是一个有用的工具。
阻塞队列方法分3类:
put | 添加元素 | 满,阻塞 |
take | 移出并返回头元素 | 空,阻塞 |
add | 添加元素 | 满,抛出IllegalStateException |
element | 返回队列头元素 | 空,抛出NoSuchElementException |
remove | 移出并返回 | 空,抛出NoSuchElementException |
offer | 添加元素并返回true | 满,返回false |
peek | 返回队列头元素 | 空,返回null |
poll | 移出并返回队列头元素 | 空,返回null |
还有带有超时的offer方法和poll方法的变体。
boolean sucess = q.offer(x, 100, TimeUnit.MILLISECONDS);
//在100毫秒尾插,成功返回ture;失败返回false。
Object head = q.poll(100, TimeUnit.MILLISECONDS)
//尝试100毫秒内移出队列的头元素,成功返回头元素,失败返回null。
java.util.concurrent包提供了阻塞队列的几个变种:
默认情况下,LinkedBlockingQueue的容量是没有上边界的,但是也可以选择指定最大容量。
LinkedBlockingDeque是一个双端的版本。
ArrayBlockingQueue在构造时需要指定容量,并且有一个可选的参数来指定是否需要公平性。若设置了公平参数,则等待时间最长的线程会优先得到处理。通常,公平性会降低性能,只有在确实非常需要时才使用它。
PriorityBlockingQueue是一个带优先级的队列。元素按优先级被移出,没有容量上限。
DelayQueue包含实现Delayed接口的对象:
interface Delayed extends Comparable<Delayed>
{
long getDelay(TimeUnit unit);
}
getDelay方法返回对象的残留延迟。负值表示已经结束。
元素只有在延迟用完的情况下才能从DelayQueue移出。
还必须实现compareTo方法。DelayQueue使用该方法对元素进行排序。
JavaSE7增加了一个TransferQueue接口,允许生产者线程等待,直到消费者准备就绪可以接受一个元素。如果生产者调用q.transfer(item);这个调用会阻塞,直到另一个线程将元素(item)删除。
LinkedTransferQueue类实现了这个接口。
14.7 线程安全集合
如果多线程要并发地修改一个数据结构,如散列表,那么很容易会破坏这个数据结构。
可以通过提供锁来保护共享数据结构,但是选择线程安全的实现作为替代可能更容易。
高效的映射表、集合、队列:
java.util.concurrent包提供了ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet、ConcurrentLinkedQueue。
size方法不在常量时间内操作,需遍历。
并发的散列映射表,可以高效的支持大量地读者和一定数量的写者。
默认16个写者,超过16个则阻塞。
ConcurrentHashMap、ConcurrentSkipListMap有相应的方法用于插入和删除:
cache.putIfAbsent(key, value);
cache.remove(key, value);
cache.replace(key, oldValue, newValue);
写数组的拷贝:
CopyOnWriteArrayList和CopyOnWriteArraySet是线程安全集合。
所有修改线程对底层数组进行复制。
如果在集合上进行迭代的线程数超过修改线程数,这样的安排是很有用的。
当构建一个迭代器时,它包含一个对当前数组的引用。
如果数组后来被修改了,迭代器仍然引用旧数组,但是,集合的数组已经被替换了。
旧的迭代器拥有一致的(可能过时的)视图,访问它无需任何同步开销。
较早的线程安全集合:
Vector和Hashtable类提供了线程安全的动态数组和散列表。
现在被弃用。
代之的是ArrayList和HashMap类,但不是线程安全的。
可以使用同步包装器(synchronization wrapper)变成线程安全的:
List<E> synchArrayList = Collections.synchronizedList( new ArrayList<E>());
Map<K, V> synchHashMap = Collections.synchronizedMap(new HashMap<K, V>());
最好使用java.util.concurrent包中定义的集合,不使用同步包装器中的。
有一个例外是经常被修改的数组列表,咋那种情况下,同步的ArrayList可以胜过CopyOnWriteArrayList
14.8 Callable与Future
Runnable封装一个异步运行的任务,无返回值和参数;
Callable与Runnable类似,但是有返回值;
public interface Callable<T>
{
T call() throws Exception;
}
Future保存异步计算的结果,可以启动一个计算,将Future对象交给某个线程,然后忘掉它。
Future对象的所有者在结果计算好之后就可以获得它。
public interface Future<V>
{
V get() throws ...;
V get(long timeout, TimeUnit unit) throws ...;
void cancel( boolean mayInterrupt);
boolean isCancelled();
boolean isDone();
}
第一个get方法的调用被阻塞,直到计算完成;
如果在计算完成之前,第二个方法的调用超时,抛TimeoutException。
如果运行该计算的线程被中断,两个get方法都抛InterruptedException。
如果计算完成,那么get方法立即返回。
如果计算还在进行,isDone方法返回false;如果完成了,返回ture;
可以用cancel方法取消该计算。如果计算未开始,它被取消且不再开始。如果计算处于运行中,那么如果mayInterrupt参数为true,它就被中断。
FutureTask包装器是一种非常便利的机制,可将Callable装换成Future和Runnable,它同时实现二者的接口。
Callable<Integer> myComputation = ...;
FutureTask<Integer> task = new FutureTask<Integer>(myComputation);
Thread t = new Thread(task); //it’s a Runnable
t.start();
...
Integer result = task.get(); //it’s a Future
14.9 执行器
构建一个新的线程有一定代价,因为涉及与操作系统的交互。
如果程序创建了大量的生命周期很短的线程,应该使用线程池(thread pool)。
一个线程池中包含许多准备运行的空闲线程。
将Runnable对象交给线程池,就会有一个线程调用run方法。
当run方法退出时,线程不会死亡,而是在池中准备为下一个请求提供服务。
另一个使用线程池的理由是减少并发线程的数目。
执行器(Executor)类有许多静态工厂方法来构建线程池:
newCachedThreadPool 必要时创建新线程;空闲线程会被保留60秒
newFixedThreadPool 包含固定数量的线程
newSingleThreadExecutor 只有一个线程,该线程顺序执行每一个提交的任务
newScheduledThreadPool 用于预定执行而构建的固定线程池,替代java.util.Timer
newSingleThreadScheduledExecutor 用于预定执行而构建的单线程池
前三个方法返回实现了ExecutorService接口的ThreadPoolExcecutor类的对象。
将一个Runnable对象或Callable对象提交给ExecutorService:
Future<?> submit(Runnable task)
Future<T> submit(Runnable task, T result)
Future<T> submit(Callable<T> task)
该线程池调用submit时,会得到一个Future对象,可以用来查询该任务的状态。
第一个submit方法,get方法在完成的时候只简单的返回null;
第二个submit方法,get方法在完成的时候返回指定的result对象;
第三个submit方法,同Future用法。
当完成一个线程池的时候,调用shutdown。线程池不再接受新任务,所有任务完成后,线程池中的线程死亡。
另一种方法时调用shutdownNow,取消尚未开始的所有任务并中断正在运行的线程。
线程池该做的事:
1、调用Executors类中的静态的方法newCachedThreadPool或newFixedThreadPool;
2、调用submit提交Runnable或Callable对象;
3、如果想要取消一个任务,或如果提交Callable对象,那就要保存好返回的Future对象;
4、当不再提交任务时,调用shutdown。
预定执行:
ScheduledExecutorService接口具有为预定执行(Scheduled Execution)或重复执行任务而设计的方法。它是一种允许使用线程池机制的java.util.Timer的泛化。
Executors类的newScheduledThreadPool和newSingleThreadScheduleExecutor方法将返回实现了ScheduledExecutorService接口的对象。
可以预定Runnable或Callable在初始的延迟之后只运行一次
也可以预定一个Runnable对象周期性地运行。
控制任务组:
有时,使用执行器的原因是,控制一组相关任务。
例如,可以在执行器中使用shutdownNow方法取消所有的任务。
invokeAny方法提交一个Callable对象集合中的所有对象,并返回一个Future对象的列表,代表所有任务的解决方案。无法知道返回的究竟是那个任务的结果,也许是最先完成的那个任务的结果。对于搜索问题,如果你愿意接受任何一种解决方案的话,就可以使用这个方法。
例如需要一个大整数进行因数分解计算来解码RSA密码。可以提交很多任务,每一个任务使用不同范围内的数来进行分解。只要其中一个任务得到了答案,计算就可以停止了。
List<Callable<T>> tasks = ...;
List<Future<T>> results = executor.invokeAll(tasks);
for (Future<T> result : results)
processFuther( result.get());
可以将结果按可获得的顺序保存起来。
一个更有效的组织如下:
ExecutorCompletionService service = new ExecutorCompletionService(executor);
for (Callable<T> task : tasks )
service.submit(task);
for ( int i = 0; i < tasks.size(); ++i)
processFurther( service.take().get() );
Fork-Join框架:
对于一些应用,可能对每个处理器内核分别使用一个线程,来完成计算密集型任务,如图像或视频处理。javase7引入了fork-join框架,来支持这种应用。
假设有一个处理任务,可以自然地分解为子任务:
if (problemSize < threshold)
solve problem directly
else
{
break down into subproblems
recursively solve each subproblem
combine the results
}
如果计算会生成一个类型为T的结果,需扩展RecursiveTask<T>类;
如果不产生任何结果,需扩展RecursiveAction类。
再覆盖compute方法调用子任务,然后合并其结果。
class Counter extends RecursiveTask<Integer>
{
...protected Integer compute(){if (to - from < THRESHOLD)solve problem directly;else{int mid = (from + to) / 2;Counter first = new Counter(values, from, mid, filter);Counter second = new Counter(values, mid, to, filter);invokeAll(first, second);return first.join() + second.join();}}
}
在这里,invokeAll方法接收到很多任务并阻塞,直到所有这些任务都已经完成。
join方法将生成结果。
在后台,fork-join框架使用了工作密取(work stealing)方法来平衡可用线程的工作负载。
14.10 同步器
java.util.concurrent包包含了几个管理线程集的类。
这些机制具有为线程之间的共用集结点模式(common rendezvous patterns)提供的预置功能(canned functionality)。
如果有一个相互合作的线程集满足这些行为模式之一,那么应该直接重用合适的库类,而不要试图提供手工的锁与条件的集合。
类 | 功能 | 何时使用 |
CyclicBarrier | 允许线程集等待,直至其中预定数目的线程到达一个公共障栅(barrier),然后可以选择执行一个处理障栅的动作 | 当大量线程需要在它们的结果可用之前完成时。 |
CountDownLatch | 允许线程集等待直到计数器减为0 | 当一个或多个线程需要等待直到指定数目的事件发生 |
Exchanger | 允许两个线程在要交换的对象准备好时交换对象 | 当两个线程工作在同一数据结构的两个实例上的时候,一个向实例添加数据而另一个从实例清除数据。 |
Semaphore | 允许线程集等待直到被允许继续运行为止 | 限制访问资源的线程总数。如果许可数是1,常常阻塞线程直到另一个线程给出许可为止 |
SynchronousQueue | 允许一个线程把对象交给另一个线程 | 在没有显式同步的情况下,当两个线程准备好将一个对象从一个线程传递到另一个时 |
CyclicBarrier类:
实现了一个集结点(rendezvous)称为障栅(barrier)。
考虑大量线程运行在一次计算的不同部分的情形。
当所有部分都准备好时,需要把结果组合在一起。
当一个线程完成了它的那部分任务后,则让他运行到障栅处。
一旦所有的线程都达到了这个障栅,障栅就撤销,线程就可以继续运行。
CyclicBarrier barrier = new CyclicBarrier(nthreads);
每一个线程做一些工作,完成后在障栅上调用await:
public void run()
{
doWork();
barrier.await();
}
await方法有一个可选的超时参数:
barrier.await(100, TimeUnit.MILLISECONDS);
如果任何一个在障栅上等待的线程离开了障栅,那么障栅就被破坏了(线程可能离开时因为它调用await时设置了超时,或者因为它被中断了)。这种情况下,所有其他线程的await方法抛出BrokenBarrierException。那些已经在等待的线程立即终止await调用。
可以提供一个可选的障栅动作(barrier action),当所有线程到达障栅的时候就会执行这一动作。
Runnable barrierAction = ...
CycliBarrier barrier = new CyclicBarrier(nthreads, barrierAction);
这个动作可以收集那些单个线程的运行结果。
障栅是循环的,可以在所有等待线程释放后被重用。
CountDownLatch 倒计时门栓
让一个线程集等待直到计数器变为0。
一次性,一旦为0就不能再重用了。
一个有用的特例是计数值为1的门栓。假定A线程需要数据,被启动且在门外等候;B线程准备数据,当数据准备好时,调用countDown,A线程就可继续运行了。
Exchanger 交换器
一个线程向缓冲区填入数据,另一个线程消耗这些数据。
当它们都完成以后,相互交换缓冲区。
Semaphore 信号量
acquire请求许可 release释放许可