Java多线程编程(四)

本文重点学习了java.util.concurrent包内的重点方法。JKD1.5增加了并发包,为并发编程提供了非常大的帮助,让我们更容易地使用并发技术。其中的阻塞队列、线程池、原子类、信号量等技术特别重要,值得我们深入学习。

线程池

Java使用线程完成异步任务是很普遍的事,而线程的创建与销毁需要一定的开销,如果每个任务都需要创建一个线程将会消耗大量的计算资源,JDK 5之后把工作单元和执行机制区分开了,工作单元包括Runnable和Callable,而执行机制则由Executor框架提供。Executor框架为线程的启动、执行和关闭提供了便利,底层使用线程池实现。使用Executor框架管理线程的好处在于简化管理、提高效率,还能避免this逃逸问题——如果我们在构造器中启动一个线程,因为另一个任务可能会在构造器结束之前开始执行,此时可能会访问到初始化了一半的对象用Executor在构造器中。
线程池的优点主要有如下几个方面:

  1. 减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。
  2. 可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,服务器负载过重导致难以预料的后果(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。

Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。Executor定义如下:

1
2
3
public interface Executor {
void execute(Runnable command);
}

虽然Executor是一个简单的接口,但却为灵活的任务执行框架打下了基础。Executor接收一个Runnable来执行任务。真正实现的方法ExecutorService中提供了对生命周期、统计信息、程序管理与监视等功能。Executor基于生产者消费者模式,提交任务的操作相当于生产者,而执行任务的操作相当于消费者。如果要实现生产者消费者模式,那么Executor是一个最简单的方式。
ExecutorService的生命周期 包括三种状态:运行、关闭、终止。创建后便进入运行状态,当调用了shutdown()方法时,便进入关闭状态,此时意味着ExecutorService不再接受新的任务,但它还在执行已经提交了的任务,当素有已经提交了的任务执行完后,便到达终止状态。如果不调用shutdown()方法,ExecutorService会一直处在运行状态,不断接收新的任务,执行新的任务,服务器端一般不需要关闭它,保持一直运行即可。

创建线程池

Executor一般是这样使用的:首先调用Executors4个静态工厂方法之一来创建线程池,然后向线程池中提交任务,再调用execute()方法来执行。示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ExecutorDemo {
public static void main(String[] args) {
//调用Executors静态工厂方法
//ExecutorService es = Executors.newCachedThreadPool();
//ExecutorService es = Executors.newSingleThreadExecutor();
ExecutorService es = Executors.newFixedThreadPool(2);
//Runnable线程
Runnable thread = new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
};
for (int i = 0; i < 5; i++)
//执行线程
es.execute(thread);
es.shutdown();
}
}

newScheduledThreadPool与上一个例子略有不同。执行的是scheduleAtFixedRate方法,其中的参数分别为线程、延迟、重复时间和时间单位。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ScheduledThreadPoolExecutorDemo {
public static void main(String[] args) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1);
exec.scheduleAtFixedRate(new Runnable() {// 每隔一段时间就触发异常
@Override
public void run() {
// throw new RuntimeException();
System.out.println("================");
}
}, 1000, 5000, TimeUnit.MILLISECONDS);
exec.scheduleAtFixedRate(new Runnable() {// 每隔一段时间打印系统时间,证明两者是互不影响的
@Override
public void run() {
System.out.println(System.nanoTime());
}
}, 1000, 2000, TimeUnit.MILLISECONDS);
}

}

下面来详细说一下创建线程的4个工厂方法:

  • newSingleThreadExecutor
    创建一个单线程的Executor。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

    1
    2
    public static ExecutorService newSingleThreadExecutor()
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)

  • newFixedThreadPool
    创建固定数目线程的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

    1
    2
    public static ExecutorService newFixedThreadPool(int nThreads)
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)

  • newCachedThreadPool
    创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。

    1
    2
    public static ExecutorService newCachedThreadPool()
    public static ExecutorService newCachedThreadPool(ThreadFactory factory)

  • newScheduledThreadPool
    创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

    1
    2
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize,ThreadFactory factory)

一般来说,CachedTheadPool在程序执行过程中通常会创建与所需数量相同的线程,然后在它回收旧线程时停止创建新线程,因此它是合理的Executor的首选,只有当这种方式会引发问题时(比如需要大量长时间面向连接的线程时),才需要考虑用FixedThreadPool。

ThreadPoolExecutor方法

Executors4大静态方法的内部都是调用了ThreadPoolExecutor这个方法,ThreadPoolExecutor是Executors类的底层实现。。完整的构造方法代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
{

if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

方法的形参多达7个,它们的含义是这样的:

  • corePoolSize :池中所保存的线程数,包括空闲线程,是线程池的基本大小。
  • maximumPoolSize:池中允许的最大线程数。
  • keepAliveTime: 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
  • unit:keepAliveTime 参数的时间单位。
  • workQueue :执行前用于保持任务的队列。此队列仅保持由 execute方法提交的 Runnable任务。
  • threadFactory:执行程序创建新线程时使用的工厂。
  • handler :由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。

根据ThreadPoolExecutor源码前面大段的注释,我们可以看出,当试图通过excute方法将一个Runnable任务添加到线程池中时,按照如下顺序来处理:

  1. 如果线程池中的线程数量少于corePoolSize,即使线程池中有空闲线程,也会创建一个新的线程来执行新添加的任务;
  2. 如果线程池中的线程数量大于等于corePoolSize,但缓冲队列workQueue未满,则将新添加的任务放到workQueue中,按照FIFO的原则依次等待执行(线程池中有线程空闲出来后依次将缓冲队列中的任务交付给空闲的线程执行);
  3. 如果线程池中的线程数量大于等于corePoolSize,且缓冲队列workQueue已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务;
  4. 如果线程池中的线程数量等于了maximumPoolSize,有4种处理方式(该构造方法调用了含有5个参数的构造方法,并将最后一个构造方法为RejectedExecutionHandler类型,它在处理线程溢出时有4种方式,这里不再细说,要了解的,自己可以阅读下源码)。

总结起来,也即是说,当有新的任务要处理时,先看线程池中的线程数量是否大于corePoolSize,再看缓冲队列workQueue是否满,最后看线程池中的线程数量是否大于maximumPoolSize。

我们再来看一看创建线程池的源码:
newSingleThreadExecutor直接指定线程池有且只有一个线程,并利用LinkedBlockingQueue这个阻塞队列。LinkedBlockingQueue有一个特点,它是无界的,具体的涵义接下来研究。

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

newFixedThreadPool指定线程有nThreads个线程,可以看做nThreads*newSingleThreadExecutor,线程数量固定,不会动态的扩大。keepAliveTime这个参数为0,说明线程只要空闲下来,马上就会被移除。

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

接下来看看newCachedThreadPool。这个方法和上两个有很大的不同。指定的线程数量为Integer.MAX_VALUE,然后时间限制为60秒(线程空闲超过60s将会被移除),还有使用的是SynchronousQueue这个有界队列。由于核心线程数为0,因此每次添加任务,都会先从线程池中找空闲线程,如果没有就会创建一个线程(SynchronousQueue决定的,后面会说)来执行新的任务,并将该线程加入到线程池中,而最大允许的线程数为Integer的最大值,因此这个线程池理论上可以不断扩大。

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

最后是newScheduledThreadPool。可以看到,调用的是DelayedWorkQueue这个队列。这个线程池中调用的任务是ScheduledFutureTask,线程池会把任务放到DelayedWorkQueue中,线程池从中获取要执行的线程并执行。

1
2
3
4
5
6
7
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
}

执行Callable任务

一共有三种方式创建线程:重写Thread类、实现Runnable接口,再一个就是实现Callable接口了。Callable接口执行任务后具有返回值。而且,Callable 的call()方法 只能 通过 ExecutorService 的 submit(Callable task) 方法来执行,并且返回一个 Future(目前是FutureTask)。
Future表示一个任务的生命周期,并提供了响应的方法来判断是否已经完成或者取消,以及获取任务结果和取消任务等。
Callable和Future的代码清单如下所示:

1
2
3
4
5
6
7
8
9
10
11
public interface Callable<V> {
V call() throws Exception;
}
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;

}

Future的get方法的行为取决于任务的状态,如果任务已经完成,那么get会立即返回或者抛出一个Exception;如果任务还未完成,那么get将 阻塞 直到任务完成。如果get抛出了ExecutionException,那么可以通过getCause来获取被封装的原始异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class CallableDemo {
static class CallableTask implements Callable<String> {
private String name;
Random random = new Random();
public CallableTask(String name) {
this.name = name;
}
@Override
public String call() throws Exception {
int sleepTime = random.nextInt(4);
System.out.println(Thread.currentThread().getName() + " sleep " + sleepTime);
TimeUnit.SECONDS.sleep(sleepTime);
return Thread.currentThread().getName() + " wake up";
}
}

public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
List<Future<String>> list = new ArrayList<Future<String>>();
for (int i = 0; i < 5; i++) {
Future<String> f = es.submit(new CallableTask((i + "")));
list.add(f);
}
for (final Future<String> f : list) {
new Thread (){
@Override
public void run() {
try {
System.out.println(f.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}.start();
}
TimeUnit.SECONDS.sleep(6);
es.shutdown();
}
}

从这个例子的运行结果来看,每个Future的get方法都在阻塞,直到线程完成操作并返回结果。

阻塞队列BlockingQueue

阻塞队列是JDK1.5中随着J.U.C包中出现的内容,统一接口是java.util.concurrent.BlockingQueue,它有多个实现类:ArrayBlockingQueue、DelayQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,前四种使用较多。
主要的组织排队方法有3种:
直接提交 。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界maximumPoolSizes以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
无界队列 。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
有界队列 。当使用有限的 maximumPoolSizes时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。

阻塞队列支持如下方法:

  • 插入方法:add(e)(添加失败会抛出异常)、offer(e)(添加失败返回特殊值)、put(e)(添加失败会一直阻塞)
  • 移除方法:remove(e)(移除失败会抛出异常)、poll(e)(移除失败会返回特殊值)、take(e)(移除失败会一直阻塞)

以ArrayBlockingQueue为例,来写一个经典的 生产者-消费者模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import java.util.concurrent.*;
public class ProducerAndConsumer {
//生产的物品
static class Thing {
String name;
public Thing(String name) {
this.name = name;
}
@Override
public String toString() {
return name;
}
}
//生产者生产物品后放入阻塞队列中
static class Producer implements Runnable {
private final BlockingQueue<Thing> blockingQueue;
public Producer(BlockingQueue<Thing> blockingQueue) {
this.blockingQueue = blockingQueue;
}
private boolean flag;
@Override
public void run() {
while (!Thread.interrupted()) {
if (flag) {
blockingQueue.add(new Thing("A"));
System.out.println("produce A");
flag = false;
} else {
blockingQueue.add(new Thing("B"));
System.out.println("produce B");
flag = true;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
break;
}
}
System.out.println("Producer over");
}
}
//消费者消费物品
static class Consumer implements Runnable {
private final BlockingQueue<Thing> blockingQueue;
public Consumer(BlockingQueue<Thing> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
while (!Thread.interrupted()) {
try {
System.out.println("consumer" + blockingQueue.take());
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Consumer over");
}
}

public static void main(String[] args) throws InterruptedException {
BlockingQueue<Thing> blockingQueue = new ArrayBlockingQueue<>(4);
Thread producer = new Thread(new Producer(blockingQueue));
Thread consumer = new Thread(new Consumer(blockingQueue));
producer.start();
TimeUnit.SECONDS.sleep(3);
consumer.start();
TimeUnit.SECONDS.sleep(3);
producer.interrupt();
}
}

CompletionService异步处理

当使用ExecutorService来创建Callable任务,相应的就应该使用Future来获取返回值。每个Callable会产生一个Future,我们需要把Future依次放入线性表中,依次的get结果。如果第一个任务执行时间较长,一直阻塞,而后面的任务早就完成了,那效率就会非常的低下。在这种应用背景下,CompletionService诞生了。CompletionService是一个高级的ExecutorService,本身自带了一个线程安全的线性表,每当Future得到结果后自动存入表中,这样,客户端就可以按照完成的时间顺序来得到结果,效率大大提高。
CompletionService的源码:

1
2
3
4
5
6
7
public interface CompletionService<V> {
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

submit可以接收Callable和Runnable参数,返回一个表示任务完成的Future。take和poll都是取出Future,但take是阻塞等待,而poll如果不成功则返回一个null。

ExecutorCompletionService

CompletionService可以说是executor和BlockingQueue的集合体。再来研究一下实现这个接口的ExecutorCompletionService的代码:

1
2
3
4
5
6
7
8
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

构造方法将一个Executor传入进来,如果未指定阻塞队列,将会new一个无界队列LinkedBlockingQueue。当任务完成后,计算后返回的数据就放在这个队列中。
再来看一看内部类QueueingFuture:

1
2
3
4
5
6
7
8
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}

这个类继承FutureTask,作用是保存Executor执行的结果。当计算完成时,调用FutureTask的done方法。当提交一个任务到ExecutorCompletionService时,首先将任务包装成QueueingFuture,它是FutureTask的一个子类,然后改写FutureTask的done方法,之后把Executor执行的计算结果放入BlockingQueue中。
这是一个使用的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class CompletionServiceDemo {
//Callable Task
static class Task implements Callable<String> {
private Random random = new Random();
@Override
public String call() throws Exception {
long time = random.nextInt(5);
System.out.printf("%s will sleep %d seconds\n", Thread.currentThread().getName(), time);
try {
TimeUnit.SECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Thread.currentThread().getName() + " had sleep " + time + " second";
}
}

public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();
CompletionService<String> cs = new ExecutorCompletionService<>(es);
try {
for (int i = 0; i < 5; i++) {
cs.submit(new Task());
TimeUnit.MILLISECONDS.sleep(10);
}
for (int i = 0; i < 5; i++) {
System.out.println(cs.take().get());
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
es.shutdown();
}
}
}

CompletionService小结

  1. 相比ExecutorService,CompletionService可以更精确和简便地完成异步任务的执行
  2. CompletionService的一个实现是ExecutorCompletionService,它是Executor和BlockingQueue功能的融合体,Executor完成计算任务,BlockingQueue负责保存异步任务的执行结果
  3. 在执行大量相互独立和同构的任务时,可以使用CompletionService
  4. CompletionService可以为任务的执行设置时限,主要是通过BlockingQueue的poll(long time,TimeUnit unit)为任务执行结果的取得限制时间,如果没有完成就取消任务

参考
并发编程大合集
Java并发编程系列
CompletionService 简介