详解Java并发流程控制工具
@[toc]
# 写在文章开头
本文将针对JUC包下几个常见的工具类进行深入剖析和演示,通过针对本文的阅读,读者将会对JUC包下的工具有一个全面的了解和运用。
2025.7.3收到读者建议,笔者在篇章末尾对于CyclicBarrier和CountDownLatch进行更进一步的补充并将例子加以完善:

我是 SharkChili ,Java 开发者,Java Guide 开源项目维护者。欢迎关注我的公众号:写代码的SharkChili,也欢迎您了解我的开源项目 mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)。
为方便与读者交流,现已创建读者群。关注上方公众号获取我的联系方式,添加时备注加群即可加入。
# CountDownLatch(倒计时门闩)
# CountDownLatch简介
在并发编程的禅意中,CountDownLatch本质上就是一种闭锁,而闭锁的语义则是等待所有其他活动都完成了,才会继续执行后续的操作。
笔者一般称CountDownLatch为倒计时门闩,它主要用于需要某些条件下才能唤醒的需求场景,例如我们线程1必须等到线程2做完某些事,那么就可以设置一个CountDownLatch并将数值设置为1,一旦线程2完成业务逻辑后,将数值修改为0,此时线程1就会被唤醒:

# 基于CountDownLatch实现等待多线程就绪
通过上述的描述可能有点抽象,我们直接通过几个例子演示一下,我们现在有这样一个需求,希望等待5个线程完成之后,打印输出一句工作完成:

对应的代码示例如下,可以看到我们创建了数值为5的CountDownLatch ,一旦线程池里的线程完成工作后就调用countDown进行扣减,一旦数值变为0,主线程await就会放行,执行后续输出:
int workerSize = 5;
CountDownLatch workCount = new CountDownLatch(workerSize);
ExecutorService threadPool = Executors.newFixedThreadPool(workerSize);
for (int i = 0; i < workerSize; i++) {
final int workerNum = i;
//5个工人输出完成工作后,扣减倒计时门闩数
threadPool.submit(() -> {
log.info("worker[{}]完成手头的工作", workerNum);
workCount.countDown();
});
}
try {
//阻塞当前线程(主线程)往后走,只有倒计时门闩变为0之后才能继续后续逻辑
log.info("等待worker工作完成");
workCount.await();
} catch (InterruptedException e) {
log.info("倒计时门闩阻塞失败,失败原因[{}]", e.getMessage(), e);
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {
}
log.info("所有工人都完成手头的工作了");
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
对应的我们也给出输出结果,可以看到主线程在线程池线程完成后才输出:

# 基于CountDownLatch实现运动员赛跑
实际上CountDownLatch可以让多个线程进行等待,我们不妨用线程模拟一下所有运动员就绪后,等待枪响后起跑的场景:

代码如下,每当运动员即线程池的线程准备就绪,则调用await等待枪响,一旦所有运动员就绪之后,主线程调用countDown模拟枪响,然后运动员起跑:
Console.log("百米跑比赛开始");
int playerNum = 3;
CountDownLatch gun = new CountDownLatch(1);
ExecutorService threadPool = Executors.newFixedThreadPool(playerNum);
for (int i = 0; i < playerNum; i++) {
final int playNo = i;
threadPool.submit(() -> {
Console.log("[{}]号运动员已就绪", playNo);
try {
gun.await();
} catch (InterruptedException e) {
Console.log("[{}]号运动员线程阻塞失败,失败原因[{}]", playNo, e.getMessage(), e);
}
Console.log("[{}]号运动员已经到达终点", playNo);
});
}
//按下枪 所有运动员起跑
gun.countDown();
threadPool.shutdown();
while (!threadPool.isTerminated()) {
}
Console.log("百米赛跑已结束");
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
对应的我们也给出相应的输出结果:
百米跑比赛开始
[0]号运动员已就绪
[2]号运动员已就绪
[1]号运动员已就绪
[2]号运动员已经到达终点
[0]号运动员已经到达终点
[1]号运动员已经到达终点
百米赛跑已结束
2
3
4
5
6
7
8
# 从源码角度分析CountDownLatch工作流程
我们以等待所有工人完成工作的例子进行解析,实际上在CountDownLatch是通过state和一个抽象队列即aqs完成多线程之间的流程调度,主线程调用await方法等待其他worker线程,如果其它worker线程没有完成工作,那么CountDownLatch就会将其存入抽象队列中。
一旦其他线程将state设置为0时,await对应的线程就会从抽象队列中释放并唤醒:

对应我们给出countDown的实现,可以看到该方法底层就是将aqs队列中的state进行扣减:
public void countDown() {
sync.releaseShared(1);
}
//releaseShared内部核心逻辑就是将state扣减1
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
//扣减state并通过cas修改赋值
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
而countDown本质上就是查看这个state,如果state被扣减为0,则调用aqs底层doReleaseShared方法将队列中等待线程唤醒:
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//查看是否扣减为0
if (tryReleaseShared(arg)) {
//如果是0则将当前等待线程唤醒
doReleaseShared();
return true;
}
return false;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
上文讲解countDown涉及一些关于AQS的实用理解和设计,关于更多AQS的知识点,感兴趣的读者可以阅读一下笔者的这篇文章:
AQS 源码解析:原理与实践:https://mp.weixin.qq.com/s/vz4TctsA0JVjfYws9gfGqQ (opens new window)
# Semaphore(信号量)
# 详解Semaphore
信号量多用于限流的场景,例如我们希望单位时间内只能有一个线程工作,我们就可以使用信号量,只有拿到线程的信号量才能工作,工作完成后释放信号量,其余线程才能争抢这个信号量并进行进一步的操作。
对应我们给出下面这段代码,可以看到笔者声明信号量数值为6,每当线程拿到3个信号量之后就会执行业务操作,完成后调用release释放3个令牌,让其他线程继续争抢:
//设置可复用的信号量,令牌数为3
Semaphore semaphore = new Semaphore(6, true);
//创建5个线程
int workSize = 5;
ExecutorService executorService = Executors.newFixedThreadPool(workSize);
for (int i = 0; i < workSize; i++) {
executorService.submit(() -> {
try {
//拿3个令牌
semaphore.acquire(3);
log.info("进行业务逻辑处理.......");
ThreadUtil.sleep(1000);
//释放3个令牌
semaphore.release(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
while (!executorService.isTerminated()) {
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
对应输出结果如下,可以看到每个线程拿到令牌后都会休眠1秒,从输出结果来看每秒只有两个线程才工作,符合我们的限流需求:

# 详解Semaphore工作原理
Semaphore底层也是用到的aqs队列,线程进行资源获取时也是通过查看state是否足够,在明确足够的情况下进行state扣减,然后进行工作。如果线程发现state数量不够,那么就会被Semaphore存入aqs底层的抽象队列中,直到state数量足够后被唤醒:

对此我们给出Semaphore底层的acquire的逻辑可以看到,它会读取state数值然后进行扣减,如果剩余数量大于0则说明令牌获取成功线程可以执行后续逻辑,反之说明当前令牌数不够,外部逻辑会将该线程挂到等待队列中,等待令牌足够后将其唤醒:
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
//读取可用的state
int available = getState();
//计算剩余的state
int remaining = available - acquires;
//如果小于0说明令牌数不足直接返回出去,让外部将线程挂起,反之通过cas修改剩余数,返回大于0的结果让持有令牌的线程执行后续逻辑
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# 基于Semaphore实现一个有界容器
利用Semaphore信号量并发获取且资源循环可复用的特性,我们可以通过实例封闭技术落地一个有界的容器,如下代码所示,只有得到信号量且添加成功了信号量才会成功扣减,如果没有拿到信号量就阻塞无法添加,除非其他线程释放自己的资源。
如下图,笔者利用信号量实现一个列表容器的限流设置,可以看到当前容器还剩一个空间,所以信号量数也是1,当线程0获得信号量成功后将元素24添加至容器中。随后的线程1看到信号量为0,即知晓容器没有可用空间就会被阻塞等待:

一旦线程1删除一个元素成功后,就会归还一个令牌,此时线程1就会被信号量唤醒,尝试获取令牌并添加元素,这就是我们有界容器实现的核心思路:

对应的我们给出有界容器的落地代码示例:
public class BoundedList<E> {
private final List<E> list;
private final Semaphore semaphore;
/*
初始化一个并发的有界容器
*/
public BoundedList(int bound) {
this.list = Collections.synchronizedList(new ArrayList<>());
this.semaphore = new Semaphore(bound);
}
public boolean add(E element) {
boolean wasAdded = false;
try {
//获取令牌,成功后才会添加容器
semaphore.acquire();
wasAdded = list.add(element);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
//添加失败则释放令牌,让其他线程可以尝试到该有界容器中添加
if (!wasAdded)
semaphore.release();
return wasAdded;
}
}
public void remove(E element) {
boolean remove = false;
try {
remove = list.remove(element);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
//只有明确元素移除成功,才会释放令牌
if (remove)
semaphore.release();
}
}
@Override
public String toString() {
return JSONUtil.toJsonStr(list);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
对应测试代码如下,大体思路为:
- 尝试让线程0填满容器使线程1阻塞
- 随后线程0移除一个元素
- 线程1被唤醒,并成功获取令牌,将元素5成功添加
BoundedList<Integer> list = new BoundedList<>(5);
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(() -> {
//添加5个元素填满容器
Console.log("线程1添加5个元素");
for (int i = 0; i < 5; i++) {
list.add(i);
}
ThreadUtil.sleep(5000);
//移除元素2,让线程2添加元素5成功
Console.log("线程1移除元素2");
list.remove(2);
countDownLatch.countDown();
}).start();
new Thread(() -> {
ThreadUtil.sleep(1000);
Console.log("线程2添加元素5");
list.add(5);
Console.log("线程2添加元素5成功");
countDownLatch.countDown();
}).start();
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
Console.log("线程1和线程2执行完毕,有界容器元素:{}", list);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
输出结果如下,符合我们对有界容器预期:
线程1添加5个元素
线程2添加元素5
线程1移除元素2
线程2添加元素5成功
线程1和线程2执行完毕,有界容器元素:[0,1,3,4,5]
2
3
4
5
# Semaphore使用注意事项
- 获取和释放的时候都可以指定数量,但是要保持一致。
- 公平性设置为true会更加合理
- 并不必须由获取许可证的线程释放许可证。可以是A获取,B释放。
# Condition
# 详解Condition
Condition即条件对象,不是很常用或者直接用到的对象,常用于线程等待唤醒操作,例如A线程需要等待某个条件的时候,我们可以通过condition.await()方法,A线程就会进入阻塞状态。
线程B执行condition.signal()方法,则JVM就会从被阻塞线程中找到等待该condition的线程。线程A收到可执行信号的时候,他的线程状态就会变成Runnable可执行状态。

对此我们给出代码示例,可以看到我们从ReentrantLock 中拿到一个Condition 对象,让创建的线程进入等待状态,随后让主线程调用condition 的signal将其唤醒:
private ReentrantLock lock = new ReentrantLock();
//条件对象,操控线程的等待和通知
private Condition condition = lock.newCondition();
public void waitCondition() throws InterruptedException {
lock.lock();
try {
log.info("等待达到条件后通知");
condition.await();
log.info("收到通知,开始执行业务逻辑");
} finally {
lock.unlock();
log.info("执行完成,释放锁");
}
}
public void notifyCondition() throws InterruptedException {
lock.lock();
try {
log.info("达到条件发起通知");
condition.signal();
log.info("发起通知结束");
} finally {
lock.unlock();
log.info("发起通知执行完成,释放锁");
}
}
public static void main(String[] args) throws InterruptedException {
Main obj = new Main();
new Thread(() -> {
try {
obj.waitCondition();
//让出CPU时间片,交给主线程发起通知
Thread.sleep(3000);
} catch (InterruptedException e) {
log.error("等待条件通知设置失败,失败原因 [{}]", e.getMessage(), e);
}
}).start();
//休眠3s唤醒等待线程
Thread.sleep(3000);
obj.notifyCondition();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
对应的我们也给出输出结果:

# 基于条件对象完成生产者、消费者模式
我们假设用一个队列存放一波生产者生产的资源,当资源满了通知消费者消费。当消费者消费空了,通知生产者生产。
所以这时候使用condition控制流程最合适(这也是阻塞的队列内部的实现),所以我们要定义两个信号,分别为:
- 当资源被耗尽,我们就使用资源未满条件(
notFull): 调用signal通知生产者消费,消费者调用await进入等待。 - 当资源被填满,使用资源为空条件(
notEmpty):将生产者用await方法挂起,消费者用signal唤醒消费告知非空。
很明显生产者和消费者本质上就是基于这两个标识分别标志自己的等待时机和通知时机,以生产者为例,即每生产一个资源后就可以调用notEmpty通知消费者消费,当生产者速度过快,则用await等待未满notFull条件阻塞:

首先我们给出生产者和消费者条件和资源队列声明,基于上述条件我们给出一个经典的生产者和消费者模式的示例,我们首先给出生产者代码,可以看到资源满的时候调用notFull.await();将自己挂起等待未满,生产资源后调用 notEmpty.signal();通知消费者消费。
对应消费者示例代码也是一样,当资源消费完全,调用notEmpty.await();等待不空,一旦消费定量资源调用notFull.signal();通知生产者生产。
最终代码示例如下:
@Slf4j
public class ProducerMode {
//锁
private static ReentrantLock lock = new ReentrantLock();
// 资源未满
private Condition notFull = lock.newCondition();
//资源为空
private Condition notEmpty = lock.newCondition();
private Queue<Integer> queue = new PriorityQueue<>(10);
private int queueMaxSize = 10;
/**
* 生产者
*/
private class Producer extends Thread {
@Override
public void run() {
while (true) {
lock.lock();
try {
if (queueMaxSize == queue.size()) {
log.info("当前队列已满,通知消费者消费");
//等待不满条件触发
notFull.await();
}
queue.offer(1);
log.info("生产者补货,当前队列有 【{}】", queue.size());
//通知消费者队列不空,可以消费
notEmpty.signal();
} catch (Exception e) {
log.error("生产者报错,失败原因 [{}]", e.getMessage(), e);
} finally {
lock.unlock();
}
}
}
}
/**
* 消费者
*/
private class Consumer extends Thread {
@Override
public void run() {
while (true) {
lock.lock();
try {
if (0 == queue.size()) {
log.info("当前队列已空,通知生产者补货");
//等待不空条件达到
notEmpty.await();
}
queue.poll();
//通知消费者不满了
notFull.signal();
log.info("消费者完成消费,当前队列还剩余 【{}】个元素", queue.size());
} catch (Exception e) {
log.error("生产者报错,失败原因 [{}]", e.getMessage(), e);
} finally {
lock.unlock();
}
}
}
}
public static void main(String[] args) {
ProducerMode mode = new ProducerMode();
Producer producer = mode.new Producer();
ProducerMode.Consumer consumer = mode.new Consumer();
producer.start();
consumer.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
对应的我们给出输出结果:

# CyclicBarrier
# CyclicBarrier 原理和使用示例
CyclicBarrier 也就是循环栅栏对象,不是很常用,它主要用于等待线程数就绪后执行公共逻辑的业务场景。
例如我们希望每凑齐5个线程后执行后续逻辑,我们就可以说明CyclicBarrier 数值为5,然后每个线程到期后调用await等待其他线程就绪。
一旦到齐5个,CyclicBarrier 就会通知这些线程开始工作,对应的代码如下所示:
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CyclicBarrier barrier = new CyclicBarrier(threadCount);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
System.out.println("线程 " + Thread.currentThread().getName() + " 开始执行任务");
try {
// 模拟执行任务
Thread.sleep(1000);
System.out.println("线程 " + Thread.currentThread().getName() + " 到达屏障");
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("所有线程都到达屏障,一起继续执行");
}).start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
对应的我们给出相应输出示例:

# CyclicBarrier 下的多核并发运算技巧
利用循环栅栏的特点,我们可以很好基于计算机核心数完成所有的耗时运算,等待所有计算完成之后,通过栅栏来汇聚计算结果打印输出:

对应我们给出主线程的实现,可以看到该处理器会得到一个与核心数一致的列表,并将列表中的每个子列表交由worker线程处理,每当worker完成列表中一个元素运算后,就会触发栅栏的方法打印结果:
public class ArraySquareCalculator {
private final List<List<Integer>> taskList;
private final Worker[] workers;
private final CyclicBarrier barrier;
public ArraySquareCalculator(List<List<Integer>> taskList) {
if (taskList == null || taskList.isEmpty()) {
throw new RuntimeException("任务列表不能为空");
}
if (taskList.size() != Runtime.getRuntime().availableProcessors()) {
throw new RuntimeException("任务列表数量必须等于CPU数量");
}
this.taskList = taskList;
barrier = new CyclicBarrier(taskList.size(), () -> {
Console.log("所有线程都到达屏障,执行结束");
Console.log("执行结果:{}", JSONUtil.toJsonStr(taskList));
});
workers = new Worker[taskList.size()];
for (int i = 0; i < taskList.size(); i++) {
workers[i] = new Worker(i, taskList, barrier);
}
}
//启动核心数对应的工作线程执行运算
public synchronized void start() {
for (Worker worker : workers) {
new Thread(worker).start();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
对应的我们也给出worker子线程代码,可以看到核心数对应的子线程worker完成各自负责列表的元素运算后,就会通过栅栏提交给主线程告知完成:
public class Worker implements Runnable {
private final int elementIdx;
private final List<List<Integer>> list;
private final CyclicBarrier barrier;
Worker(int elementIdx, List<List<Integer>> list, CyclicBarrier cyclicBarrier) {
this.elementIdx = elementIdx;
this.list = list;
this.barrier = cyclicBarrier;
}
@SneakyThrows
@Override
public void run() {
//每个核心对应的线程处理各自索引列表
List<Integer> workList = list.get(elementIdx);
for (int i = 0; i < workList.size(); i++) {
//完成负责列表元素计算后,通过屏障等待所有线程完成
workList.set(i, workList.get(i) << 1);
barrier.await();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
对应的我们也给出测试代码:
//创建一个与核心数一样的列表
int size = Runtime.getRuntime().availableProcessors();
List<List<Integer>> list = new ArrayList<>();
//添加元素到列表中
for (int i = 0; i < size; i++) {
ArrayList<Integer> arrayList = new ArrayList<>();
for (int j = 1; j <= 3; j++) {
arrayList.add(j);
}
list.add(arrayList);
}
//启动并行运算处理器
ArraySquareCalculator calculator = new ArraySquareCalculator(list);
calculator.start();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
输出结果如下,与预期一致:

# CyclicBarrier如何控制并发
以上面并行核心线程运算逻辑为例,本质上await方法调用后底层就会完成count扣减,当count为0后就会触发一次主线程逻辑调用,也就是我们的打印输出,即通过count来完成线程之间的循环并发流程阻塞和通知:

对应的我们也给出await的源码,可以看到其内部是通过调用dowait执行上述所说逻辑:
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
2
3
4
5
6
7
8
9
10
11
查看dowait即可印证我们的逻辑:
- 所有线程调用await执行count扣减
- count为0调用barrierCommand也就是我们初始化时设置的打印输出方法
- 完成barrierCommand任务执行后调用nextGeneration将count重置为初始化时的数值,对应的我们的代码就是CPU核心数
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
//......
int index = --count;
//count扣减为0 步入执行逻辑
if (index == 0) { // tripped
boolean ranAction = false;
try {
//调用barrierCommand执行归并逻辑运算
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//将count重置为初始值
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
//......
} finally {
lock.unlock();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 关于CyclicBarrier 与CountDownLatch的更进一步的理解
# 二者使用维度上的理解
基于上述的例子,我们不难看出CyclicBarrier 与CountDownLatch在使用维度和工作理念上的区别,在执行层面上,CountDownLatch针对的是事件而非针线程,即通过事件对象countdown结束阻塞,即针对CountDownLatch指定次数n并非需要n个线程把控,例如CountDownLatch设置为3,即使只有两个线程也依然可以释放CountDownLatch这个闭锁从而唤醒阻塞线程:

而CyclicBarrier 循环栅栏作用于线程,即指定CyclicBarrier的线程数parties,就必须有parties个线程就绪:

同时在使用的角度来说,循环栅栏CyclicBarrier可重复使用,CountDownLatch则不能
# 关于CountDownLatch不完美的就绪和解决思路
上文例子中笔者基于CountDownLatch给出了一个运动员赛跑的例子,但这个例子存在一个小小的瑕疵,即利用CountDownLatch避免了运动员提前起跑,但未能保证所有运动员准备就绪。如下图,在CountDownLatch未执行countDown时,线程0和线程1已经到达await阻塞,但是线程2还未到达await,此时就会出现这种情况:
- CountDownLatch执行countDown,唤醒阻塞线程
- 线程0和线程1起跑
- 线程0和线程1到达终点
- 线程2才到达await,此时已经countDown,在上述线程完成比赛后才到终点

如下所示,笔者这里利用线程模式调试复现出了该问题:
百米跑比赛开始
[0]号运动员已就绪
[1]号运动员已就绪
[0]号运动员已经到达终点
[1]号运动员已经到达终点
[2]号运动员已就绪
[2]号运动员已经到达终点
2
3
4
5
6
7
所以,我们就必须使用某个让所有线程就绪之后再使用CountDownLatch模拟枪响后运动员起跑,于是我们就想到以线程为维度控制并发的CyclicBarrier,即利用CyclicBarrier确保所有运动员就绪后,再执行CountDownLatch让所有线程同时起跑运行:

对应我们给出优化后的代码:
Console.log("百米跑比赛开始");
int playerNum = 3;
CountDownLatch gun = new CountDownLatch(1);
CyclicBarrier barrier = new CyclicBarrier(playerNum, () -> {
Console.log("所有运动员已就绪,开始比赛");
//按下枪 所有运动员起跑
gun.countDown();
});
ExecutorService threadPool = Executors.newFixedThreadPool(playerNum);
for (int i = 0; i < playerNum; i++) {
final int playNo = i;
threadPool.submit(() -> {
try {
Console.log("[{}]号运动员已就绪", playNo);
barrier.await();
gun.await();
} catch (Exception e) {
Console.log("[{}]号运动员线程阻塞失败,失败原因[{}]", playNo, e.getMessage(), e);
}
Console.log("[{}]号运动员已经到达终点", playNo);
});
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {
}
Console.log("百米赛跑已结束");
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
对应输出结果如下,可以看到此时所有运动员就是同时就绪,同时起跑了:
百米跑比赛开始
[0]号运动员已就绪
[2]号运动员已就绪
[1]号运动员已就绪
所有运动员已就绪,开始比赛
[1]号运动员已经到达终点
[2]号运动员已经到达终点
[0]号运动员已经到达终点
百米赛跑已结束
2
3
4
5
6
7
8
9
# 小结
以上便是笔者对于各个流程并发工具的剖析和实践,希望对你有帮助。
我是 SharkChili ,Java 开发者,Java Guide 开源项目维护者。欢迎关注我的公众号:写代码的SharkChili,也欢迎您了解我的开源项目 mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)。
为方便与读者交流,现已创建读者群。关注上方公众号获取我的联系方式,添加时备注加群即可加入。
# 参考
控制并发流程:https://blog.csdn.net/DLC990319/article/details/106681012) (opens new window)
《Java并发编程实战》