Java多线程编程(五)

我们将继续学习J.U.C的功能。
Java并发包中有三个类用于同步一批线程的行为,分别是CountDownLatch、Semaphore和CyclicBarrier。

Fork/Join框架

Java7中引入了Fork/Join框架,为并发编程提供了一个非常好的选择。字面意思来讲,Fork是拆分,Join是合并,而这就是这个框架最显著的特质。Fork/Join框架把任务拆分后,采用分治法,当一个线程正在等待他创建的子线程运行的时候,当前线程如果完成了自己的任务后,就会寻找还没有被运行的任务并且运行他们,这样就是和Executors这个方式最大的区别,更加有效的使用了线程的资源和功能。所以非常推荐使用Fork/Join框架。

工作窃取算法

工作窃取算法是指线程从其他任务队列中窃取任务执行。考虑下面这种场景:有一个很大的计算任务,为了减少线程的竞争,会将这些大任务切分为小任务并分在不同的队列等待执行,然后为每个任务队列创建一个线程执行队列的任务。那么问题来了,有的线程可能很快就执行完了,而其他线程还有任务没执行完,执行完的线程与其空闲下来不如帮助其他线程执行任务,这样也能加快执行进程。所以,执行完的空闲线程从其他队列的尾部窃取任务执行,而被窃取任务的线程则从队列的头部取任务执行(这里使用了双端队列,既不影响被窃取任务的执行过程又能加快执行进度)。
从以上的介绍中,能够发现工作窃取算法的优点是充分利用线程提高并行执行的进度。当然缺点是在某些情况下仍然存在竞争,比如双端队列只有一个任务需要执行的时候。
ForkJoinPool使用守护线程(deamon)来执行任务,因此无需对他显示的调用shutdown()来关闭。一般情况下,一个程序只需要唯一的一个ForkJoinPool,因此应该按如下方式创建它:

static final ForkJoinPool mainPool = new ForkJoinPool(); //线程的数目等于CPU的核心数

使用Fork/Join框架

使用Fork/Join框架分为两步:

  1. 分割任务:首先需要创建一个ForkJoin任务,执行该类的fork方法可以对任务不断切割,直到分割的子任务足够小
  2. 合并任务执行结果:子任务执行的结果同一放在一个队列中,通过启动一个线程从队列中取执行结果。

来看一个最简单的Fibonacci数列的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ForkJoinDemo {
static final ForkJoinPool forkJoinPool = new ForkJoinPool();
static class Fibonacci extends RecursiveTask<Integer> {
private static final long serialVersionUID = 1L;
final int n;
public Fibonacci (int n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n <= 1)
return n;
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
Future<Integer> result = forkJoinPool.submit(new Fibonacci(10));
System.out.println(result.get());
}
}

代码中使用了FokJoinTask,其与一般任务的区别在于它需要实现compute方法,在方法需要判断任务是否在阈值区间内,如果不是则需要把任务切分到足够小,直到能够进行计算。每个被切分的子任务又会重新进入compute方法,再继续判断是否需要继续切分,如果不需要则直接得到子任务执行的结果,如果需要的话则继续切分,如此循环,直到调用join方法得到最终的结果。
可以发现Fork/Join框架的需要把提交给ForkJoinPool,ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,前者负责将存放程序提交给ForkJoinPool的任务,后者则负责执行这些任务。关键在于在于fork方法与join方法。
暂时了解Fork/Join框架,具体的源码以后有时间再研究研究。

参考
使用Java7提供的Fork/Join框架
Java线程之fork/join框架

CountDownLatch闭锁

闭锁CountDownLatch也是J.U.C包中的一个重要的特性。闭锁是一种同步工具,可以延迟线程的进度直到其达到终止状态,也就是说 CountDownLatch能够使一个线程等待其他线程完成各自的工作后再执行 。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。更通俗的说就是可以设定线程的先后顺序,一些执行后另一些在执行。
CountDownLatch的应用场景:

  1. 实现最大的并行性:有时我们想同时启动多个线程,实现最大程度的并行性。例如,我们想测试一个单例类。如果我们创建一个初始计数为1的CountDownLatch,并让所有线程都在这个锁上等待,那么我们可以很轻松地完成测试。我们只需调用一次countDown()方法就可以让所有的等待线程同时恢复执行。
  2. 开始执行前等待n个线程完成各自任务:例如应用程序启动类要确保在处理用户请求前,所有N个外部系统已经启动和运行了。
  3. 死锁检测:一个非常方便的使用场景是,你可以使用n个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁。

CountDownLatch内部原理

  1. 构造方法设置了计数器count的值:

    1
    2
    3
    4
    public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
    }
  2. CountDownLatch主要是通过Sync这个内部类来控制线程。Sync实现了AbstractQueuedSynchronizer,AbstractQueuedSynchronizer是非常重要的一个抽象类,关于这个抽象类的特性以后再来学习。Sync主要就是重写了tryAcquireShared和tryReleaseShared这个方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
    }
    protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
    int c = getState();
    if (c == 0)
    return false;
    int nextc = c-1;
    if (compareAndSetState(c, nextc))
    return nextc == 0;
    }
    }

可以看出,tryReleaseShared在不停的忙等待,如果状态为0,那么返回false;否则计数器减一,如果compareAndSetState不成功,那么继续循环。

  1. await()和countDown()的实现
    这两个方法是CountDownLatch最重要的业务方法,分别是等待和计数器减一。
    1
    2
    3
    4
    5
    6
    sync.acquireSharedInterruptibly(1);
    -->if (tryAcquireShared(arg) < 0)//调用上面的tryAcquireShared()方法
    doAcquireSharedInterruptibly(arg);//加入到等待队列中
    sync.releaseShared(1);
    --> if (tryReleaseShared(arg))//调用tryReleaseShared()方法
    doReleaseShared();//解锁

此外,await还有一个重载方法await(long time, TimeUnit unit): CountDownLatch 内部计数器到达0或者特定的时间过去了之后启动。

CountDownLatch的使用

使用CountDownLatch类有3个步骤:

  1. 初始值决定CountDownLatch类需要等待的事件的数量。
  2. await() 方法, 被等待全部事件终结的线程调用。
  3. countDown() 方法,事件在结束执行后调用。

当创建CountDownLatch对象,使用构造函数的参数来初始化内部计数器。每次调用countDown方法,计数器减一。当计数器为0,唤醒全部await的线程。注意,这个计数器的值只能通过countDown方法来更改,而不能通过显示的设置来更改。因此,可以说CountDownLatch对象是一次性的,使用一次后必须重新创建新的对象才能正常使用。
这是一个使用CountDownLatch的一个典型的例子。会议准备后,成员们入场。所有人都来了后会议再开始。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class CountDownLatchDemo {
//会议,在所有成员到来之后才能开始
static class Conference implements Runnable{
private final CountDownLatch latch;
//初始化闭锁
public Conference(int num) {
latch = new CountDownLatch(num);
}
//arrive一个人后,countDown
public void arrive(String name) {
System.out.println(name + " arrives.");
//每完成一次活动,计数器减一
latch.countDown();
System.out.println("remain " + latch.getCount() + " persons.");
}
//重写run方法,调用await方法
@Override
public void run() {
System.out.println("Conference: There will be " + latch.getCount() + " participants.");
try {
//等待计数器归零
latch.await();
System.out.println("Conference: All the participants have come");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//成员类
static class Participant implements Runnable {
private Conference conference;
private String name;
//传入闭锁和成员名字
public Participant(Conference conference, String name) {
this.conference = conference;
this.name = name;
}
//睡眠一段时间后通知闭锁
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep((long)(Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
conference.arrive(name);
}
}
//先设置闭锁的数目,然后创建成员来实现任务。
public static void main(String[] args) {
Conference conference = new Conference(10);
new Thread(conference).start();
for (int i = 0; i < 10; i++) {
Participant participant = new Participant(conference, "participant" + i);
new Thread(participant).start();
}
}
}

参考
Java并发学习之十七——线程同步工具之CountDownLatch
什么时候使用CountDownLatch

CyclicBarrier栅栏

CountDownLatch是闭锁,可以启动一组相关的操作,或者等待一组相关的操作结束。闭锁是一次性对象,一旦终止就不能被重置。Barrier类似于闭锁,可以阻塞一组线程直到某个事件发生。这两个类十分类似,区别就在于栅栏可以重复使用,而闭锁只能用一次。
通过栅栏可以实现让一组线程等待至某个状态之后再全部同时执行。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。

CyclicBarrier原理

首先来看一看构造方法:

1
2
3
4
5
6
7
8
9
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}

参数parties指定等待线程的数量,barrierAction为当前线程都达到了barrier状态时执行的线程。如果不指定,那么所有线程await状态结束后自动向后运行。
可以看到,核心方法await实际上就是调用dowait这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {

//上锁,同一时刻只能有一个await方法执行
final ReentrantLock lock = this.lock;
lock.lock();
try {
//取当前线程的Generation来看障碍点是否被破坏
final Generation g = generation;
//如果已经被破坏,抛出BrokenBarrierException异常
if (g.broken)
throw new BrokenBarrierException();
//判断当前线程是否已中断,如果中断则唤醒其他线程并抛出InterruptedException
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//前面判断没问题,则当前的任务数目-1
int index = --count;
//如果任务数目为0,说明达到了障碍点,需要唤醒所有的等待线程
if (index == 0) { // tripped
boolean ranAction = false;
try {
//执行构造方法中传入的线程
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//如果正常执行完毕,则重新创建一个Generation
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
// 如果上一步返回值不是0,说明没有达到屏障点,执行自旋
for (;;) {
try {
//timed为定时标志位,如果不需要定时则直接等待
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();
//自旋结束的出口,如果g发生变化,则正常退出
if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

Generation是一个只有一个状态的内部类。为什么要创建这样一个内部类呢?java标准类型是值传递,引用类型是应用传递。因此,要实现可变的标志位(boolean类型),必须使用引用传递,需要一个类对flag进行封装。因此,才会有这样一个内部类。
另外线程持有的generation和全局的generation是否是同一个将被视为自旋的出口,如果是同一个说明当前的批次还没有执行完毕(或者执行出错了),则继续自旋阻塞,如果不一致则说明已经建立下一个屏障,当前批次已经正常执行完毕可正常退出返回对应的到达屏障的索引。

使用栅栏

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class CyclicBarrierDemo {
//模拟写入操作
static class WriteData implements Runnable{
private final CyclicBarrier cyclicBarrier;
public WriteData(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " is writing.");
try {
TimeUnit.SECONDS.sleep((long) (Math.random() * 5));
System.out.println(Thread.currentThread().getName() + " ends");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
//
public static void main(String[] args) throws InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable(){
@Override
public void run() {
System.out.println("所有线程均到达栅栏位置,开始下一轮计算");
}
});
ExecutorService es = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 5; i++) {
es.execute(new WriteData(cyclicBarrier));
}

TimeUnit.SECONDS.sleep(5);
System.out.println("Barrier again.");
//再次使用栅栏
for (int i = 0; i < 5; i++) {
es.execute(new WriteData(cyclicBarrier));
}
} finally {
es.shutdown();
}
}
}

参考
深入浅出Java并发包—CyclicBarrier原理分析(二)
并发新特性—障碍器CyclicBarrier(含代码)

信号量Semaphore

信号量用来控制同时访问某个特定资源的操作数量。
著名的厕所理论是这样介绍信号量的:Semaphore实现的功能就类似厕所有5个坑,假如有10个人要上厕所,那么同时只能有多少个人去上厕所呢?同时只能有5个人能够占用,当5个人中的任何一个人让开后,其中等待的另外5个人中又有一个人可以占用了。另外等待的5个人中可以是随机获得优先机会,也可以是按照先来后到的顺序获得机会,这取决于构造Semaphore对象时传入的参数选项。单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,再由另一个线程释放“锁”,这可应用于死锁恢复的一些场合。
当需要一个许可证的时候,调用Semaphore的acquire方法,而在使用完许可证需要归还许可证的时候,调用release方法归还。Semaphore还可以使用tryAcquire方法尝试获取许可证。
Semaphore非常适合用于流量控制,特别是公有资源有限的场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发的读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有十个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,我们就可以使用Semaphore来做流控。
Semaphore内部也是实现了AbstractQueuedSynchronizer的一个内部类,通过AQS来管理线程。
这是一个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class SemaphoreDemo {
private static ExecutorService es = Executors.newFixedThreadPool(30);
//设定信号量为4,同一时刻只有4个线程执行
private static Semaphore semaphore = new Semaphore(4);
public static void main(String[] args) {
for (int i = 0; i < 30; i++) {
es.execute(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " execute");
TimeUnit.SECONDS.sleep(1);
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
es.shutdown();
}
}

这段代码执行时可用很清楚的看到每隔1s执行4个线程,每个线程都遵守这个规则。

交换器Exchanger

Exchanger可以在两个线程之间交换数据,只能是2个线程,他不支持更多的线程之间互换数据。
当线程A调用Exchange对象的exchange()方法后,他会陷入阻塞状态,直到线程B也调用了exchange()方法,然后以线程安全的方式交换数据,之后线程A和B继续运行。
这是一个生产者-消费者问题的Exchanger写法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class ExchangerDemo {
static class Producer implements Runnable {
private List<Integer> list = new ArrayList<>();
private Exchanger<List<Integer>> exchanger;
public Producer(Exchanger<List<Integer>> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
for (int j = 0; j < 5; j++)
list.add((int) (Math.random() * 10));
System.out.println(i + "time producer done, list is " + list);
//写入list后进行交换
list = exchanger.exchange(list);
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class Consumer implements Runnable {
private List<Integer> list = new ArrayList<>();
private Exchanger<List<Integer>> exchanger;
public Consumer(Exchanger<List<Integer>> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
//阻塞等待Producer写入list
list = exchanger.exchange(list);
System.out.println(i + "time consumer get list " + list);
list.clear();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
Exchanger<List<Integer>> exchanger = new Exchanger<>();
new Thread(new Producer(exchanger)).start();
new Thread(new Consumer(exchanger)).start();
}
}

参考
Java线程之Exchanger