Juc 提供了很多关于线程安全的类,比如:

  • 线程池相关的: ThreadPoolExecutorExecutors 等;
  • 锁相关的,如 LockReadWriteLockReentrantLock , ReentrantReadWriteLock 等;
  • 线程安全的数据结构,应用于线程安全的集合中,如 ConcurrentHashMapLinkedBlockingQueueDelayQueue 等;
  • 还有高级的线程同步的类,如 CountDownLatchCyclicBarrierSemaphore , Phaser 等。
  • 还有一些原子类。如 AtomicInteger , AtomicLong , AtomicReference 等等

今天我们就来看下 JUC 提供的线程同步类。

# CountDownLatch

CountDownLatch 一种允许一个或多个线程等待,直到在其他线程中执行的一组操作完成的同步辅助工具类。 可以理解为这是一个计数器。

CountDownLatch 提供了个好用的方法:

  • countDown() :调用一次,计数器减 1 。直到减为 0.
  • await() :当计数器不为 0 时,则调用该方法的线程阻塞,当计数器为 0 时,可以唤醒等待的一个或者全部线程。

# CountDownLatch 使用场景:

  • 排队,比如排队做核酸。假设有个 5 个人做核酸.

CountDownLatch 示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo{
public static void main(String[] args) throws InterruptedException {
CountDownLatch nucleinCountDownLatch = new CountDownLatch(5);
System.out.println("开始做核酸啦~~~");
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
final int j = i;
executorService.execute(() -> {
System.out.println("user" + j + " "+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + "做完了核酸");
nucleinCountDownLatch.countDown();
});
Thread.sleep(2000);
}
nucleinCountDownLatch.await();
System.out.println("核算员着急忙慌的下班了, 可担心再来一个做核酸的....");
executorService.shutdown();
}
}

以上程序执行结果如下:

1
2
3
4
5
6
7
开始做核酸啦~~~
user:0 2023-02-03 22:26:44做完了核酸
user:1 2023-02-03 22:26:46做完了核酸
user:2 2023-02-03 22:26:48做完了核酸
user:3 2023-02-03 22:26:50做完了核酸
user:4 2023-02-03 22:26:52做完了核酸
核算员着急忙慌的下班了, 可担心再来一个做核算的....

# CyclicBarrier

CyclicBarrier , 直译过来就是 循环屏障。它允许一组线程全部等待,直接这组线程到达相同的状态的一种同步辅助工具. CyclicBarriers 在涉及必须偶尔相互等待的固定大小线程组的程序中很有用。 屏障之所以称为循环屏障,是因为它可以在等待线程被释放后重新使用。这句话敲重点!!!

使用起来也是相当简单!

就是一个方法,await (). 等待这一组线程都执行相同的状态。与 CountDownLatch 不同的是, CyclicBarrier 是可以重复使用的呦。

CyclicBarrier 应用起来太广泛了,我司有班车,不好意思,我司早晚提供免费班车… 你懂得 fa~

上代码…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

public class CyclicBarrierTest {
public static void main(String[] args) throws InterruptedException {

// 假设每个班车有50个座位.
CyclicBarrier cyclicBarrier = new CyclicBarrier(50, () -> System.out.println("满座了! 开始起飞....."));

// 100人
for (int i = 0; i < 100; i++) {
new Thread(new RegularBus(cyclicBarrier, i)).start();
}
}


/**
* 班车
*/
static class RegularBus implements Runnable {
private final CyclicBarrier cyclicBarrier;

/**
* 员工工号
*/
private final Integer ppId;

RegularBus(CyclicBarrier cyclicBarrier, Integer poorPeopleId) {
this.cyclicBarrier = cyclicBarrier;
this.ppId = poorPeopleId;
}

@Override
public void run() {
if (ppId % 10 == 0) {
System.out.println("工号为" + ppId + "的员工抢到了一个的座位.....");
}
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}

以上程序执行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
工号为10的员工抢到了一个的座位.....
工号为0的员工抢到了一个的座位.....
工号为30的员工抢到了一个的座位.....
工号为20的员工抢到了一个的座位.....
工号为40的员工抢到了一个的座位.....
满座了! 开始起飞.....
工号为50的员工抢到了一个的座位.....
工号为60的员工抢到了一个的座位.....
工号为70的员工抢到了一个的座位.....
工号为80的员工抢到了一个的座位.....
工号为90的员工抢到了一个的座位.....
满座了! 开始起飞.....

# Semaphore

Semaphore 这个更简单了,直译!信号量。 他用于管理多线程中控制资源的访问与使用。 常见的就是限流。对应生活中的栗子,我想一下哈!

还是我司! 门卫!每天早上,我司大门口,都有两个五大三粗的壮汉在数 进来了多少辆车,出去了多少辆车。还时不时的用对讲机:洞幺, 洞幺, 我是洞两,还有几个车位,还有几个车位.
对讲机那边:我草,我看看。地下没有车位了,地上问问拐哥.
洞两: “洞拐,洞拐,地上还有几个车位?”
洞拐: “多得很,多得很。”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
 
public class SemaphoreTest {

public static void main(String[] args) {

// 车位总数
Semaphore semaphore = new Semaphore(5);

// 开车上班的人
ThreadPoolExecutor richManThreadPool = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

for (int i = 0; i < 10; i++) {
int finalI = i;
richManThreadPool.execute(() -> {
try {
// 堵塞获取许可,即获取一个车位.
semaphore.acquire();
System.out.println("开车上班的员工:" + Thread.currentThread().getName() + " 抢到了一个车位:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
// 假设在上班中....
TimeUnit.SECONDS.sleep(2);
// 释放许可, 开车回家了...
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}

以上程序执行结果如下:

1
2
3
4
5
6
7
8
9
10
开车上班的员工:pool-1-thread-5 抢到了一个车位:2023-02-03 23:01:07
开车上班的员工:pool-1-thread-2 抢到了一个车位:2023-02-03 23:01:07
开车上班的员工:pool-1-thread-1 抢到了一个车位:2023-02-03 23:01:07
开车上班的员工:pool-1-thread-3 抢到了一个车位:2023-02-03 23:01:07
开车上班的员工:pool-1-thread-4 抢到了一个车位:2023-02-03 23:01:07
开车上班的员工:pool-1-thread-6 抢到了一个车位:2023-02-03 23:01:09
开车上班的员工:pool-1-thread-7 抢到了一个车位:2023-02-03 23:01:09
开车上班的员工:pool-1-thread-8 抢到了一个车位:2023-02-03 23:01:09
开车上班的员工:pool-1-thread-9 抢到了一个车位:2023-02-03 23:01:09
开车上班的员工:pool-1-thread-10 抢到了一个车位:2023-02-03 23:01:09

# Phaser

说实话,这个东西,是我写这篇文章,收获最大的一个东西。扫盲了,在写这篇文章之前,我确实不知道还有这哥们.

Phaser (移相器)是 JDK 7 提供的,是一个可重用的同步屏障,在功能上类似于 CyclicBarrier 和 CountDownLatch,但支持更灵活的使用。 它的功能是等待所有线程到达之后,才继续或者开始进行新的一组任务。

Phaser 的功能非常强大,所以方法也比较多,我简单一说,大家细细研究!

  • register() :注册新的参与者到 Phaser
  • arriveAndAwaitAdvance() :等待其他线程执行
  • arriveAndDeregister() :注销此线程
  • forceTermination() :强制 Phaser 进入终止态
  • isTerminated() :判断 Phaser 是否终止

这个我司的 case 就不好举例了。因为班车的栗子举过了呀。 所以,我就想起了有一次我在故宫玩,看到了一个旅行团…

导游: “首先,这里我看到的是:太和殿,xxx (巴啦啦,说一通历史), 大家先去拍照,拍完照到我这里集合,咱们去下一个殿”
… 旅客在搔首弄姿…
导游: “那么,这里我看到的是:中和殿,xxx (巴啦啦,说一通历史), 大家先去拍照,拍完照到我这里集合,咱们去下一个殿”
… 旅客在搔首弄姿…・
导游: “最后,这里我看到的是:保和殿,xxx (巴啦啦,说一通历史), 大家先去拍照,拍完照到我这里集合,咱们去看看皇帝和皇后睡觉的地方”
旅客:拍啥拍了,直接去看吧

Phaser

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import java.util.Random;
import java.util.concurrent.Phaser;

public class PhaserTest {
public static void main(String[] args) throws InterruptedException {
// 导游
Phaser phaser = new TouristGuide();
// 游客
Tourist[] tourists = new Tourist[5];
for (int i = 0; i < tourists.length; i++) {
tourists[i] = new Tourist(phaser, "游客" + i);
// 注册 Phaser 等待的线程数,执行一次等待线程数 +1
phaser.register();
}

// 每个游客自行活动..
for (int i = 0; i < tourists.length; i++) {
// 执行任务
new Thread(tourists[i]).start();
}
}

static class Tourist implements Runnable {
private final Phaser phaser;

private String name;

public Tourist(Phaser phaser, String name) {
this.phaser = phaser;
this.name = name;
}

@Override
public void run() {
System.out.println(this.name + " | 到了");
phaser.arriveAndAwaitAdvance();
try {
Thread.sleep(new Random().nextInt(5) * 1000);
System.out.println(this.name + " | 拍完照片了...");
phaser.arriveAndAwaitAdvance();
Thread.sleep(new Random().nextInt(5) * 1000);
System.out.println(this.name + " | 拍完照片了...");
phaser.arriveAndAwaitAdvance();
Thread.sleep(new Random().nextInt(5) * 1000);
System.out.println(this.name + ":\"拍啥拍,赶紧去看皇帝皇后睡觉吧\"");
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

// Phaser 每个阶段完成之后的事件通知. (===> 导游)
static class TouristGuide extends Phaser {
/**
* 每个阶段执行完之后的回调
*/
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0 -> {
System.out.println("首先,这里我看到的是:太和殿, xxx(巴啦啦,说一通历史), 大家先去拍照, 拍完照到我这里集合, 咱们去下一个殿");
return false;
}
case 1-> {
System.out.println("那么,这里我看到的是:中和殿, xxx(巴啦啦,说一通历史), 大家先去拍照, 拍完照到我这里集合, 咱们去下一个殿");
return false;
}
case 2 -> {
System.out.println("最后,这里我看到的是:保和殿, xxx(巴啦啦,说一通历史), 大家先去拍照, 拍完照到我这里集合, 咱们去看看皇帝和皇后睡觉的地方");
return false;
}
default -> {
return true;
}
}
}
}
}

以上程序执行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
游客0 | 到了
游客4 | 到了
游客3 | 到了
游客2 | 到了
游客1 | 到了
首先,这里我看到的是:太和殿, xxx(巴啦啦,说一通历史), 大家先去拍照, 拍完照到我这里集合, 咱们去下一个殿
游客3 | 拍完照片了...
游客1 | 拍完照片了...
游客2 | 拍完照片了...
游客0 | 拍完照片了...
游客4 | 拍完照片了...
那么,这里我看到的是:中和殿, xxx(巴啦啦,说一通历史), 大家先去拍照, 拍完照到我这里集合, 咱们去下一个殿
游客3 | 拍完照片了...
游客4 | 拍完照片了...
游客0 | 拍完照片了...
游客1 | 拍完照片了...
游客2 | 拍完照片了...
最后,这里我看到的是:保和殿, xxx(巴啦啦,说一通历史), 大家先去拍照, 拍完照到我这里集合, 咱们去看看皇帝和皇后睡觉的地方
游客0:"拍啥拍,赶紧去看皇帝皇后睡觉吧"
游客2:"拍啥拍,赶紧去看皇帝皇后睡觉吧"
游客1:"拍啥拍,赶紧去看皇帝皇后睡觉吧"
游客4:"拍啥拍,赶紧去看皇帝皇后睡觉吧"
游客3:"拍啥拍,赶紧去看皇帝皇后睡觉吧"

# 最后

希望和你一起遇见更好的自己

更新于 阅读次数

请我喝[咖啡]~( ̄▽ ̄)~*

方小白 微信支付

微信支付

方小白 支付宝

支付宝

方小白 numberpay

numberpay