Java并发-Semaphore

概述

Semaphore(信号量)是AQS共享模式的一个应用,可以允许多个线程同时对共享资源进行操作,并且可以有效的控制并发数,利用它可以很好的实现流量控制。
Semaphore提供了一个许可证的概念,可以把这个许可证看作车票,只有成功获取车票的人才能够上车,并且车票是有数量的,不可能毫无限制的发下去,这样就会导致车辆超载。所以当车票发完的时候(车辆满载),其他人就只能等下一趟车。如果中途有人下车,那么他的位置将会空闲出来,因此如果这时其他人想要上车的话就又可以获得车票了。

构造器

1
2
3
4
5
6
7
8
9
//构造器1
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

//构造器2
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Semaphore提供了两个带参构造器,没有提供无参构造器。这两个构造器都必须传入一个初始的许可证数量,使用构造器1构造出来的信号量在获取许可证时会采用非公平方式获取,使用构造器2可以通过参数指定获取许可证的方式(公平or非公平)。Semaphore主要对外提供了两类API,获取许可证和释放许可证,默认的是获取和释放一个许可证,也可以传入参数来同时获取和释放多个许可证。

获取许可证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//获取一个许可证(响应中断)
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

//获取一个许可证(不响应中断)
public void acquireUninterruptibly() {
sync.acquireShared(1);
}

//尝试获取许可证(非公平获取)
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}

//尝试获取许可证(定时获取)
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

上面的API是Semaphore提供的默认获取许可证操作。每次只获取一个许可证。除了直接获取还提供了尝试获取,直接获取操作在失败之后可能会阻塞线程,而尝试获取则不会。另外还需注意的是tryAcquire方法是使用非公平方式尝试获取的。平时比较常用到的是acquire方法去获取许可证。acquire方法里面直接就是调用sync.acquireSharedInterruptibly(1),这个方法是AQS里面的方法,下面再来回顾一下这个方法。

1
2
3
4
5
6
7
8
9
10
11
12
//以可中断模式获取锁(共享模式)
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
//首先判断线程是否中断, 如果是则抛出异常
if (Thread.interrupted()) {
throw new InterruptedException();
}
//1.尝试去获取锁,获取成功直接返回
if (tryAcquireShared(arg) < 0) {
//2. 如果获取失败则进入该方法
doAcquireSharedInterruptibly(arg);
}
}

acquireSharedInterruptibly方法首先就是去调用tryAcquireShared方法去尝试获取,tryAcquireShared在AQS里面是抽象方法,FairSync和NonfairSync这两个派生类实现了该方法的逻辑。FairSync实现的是公平获取的逻辑,而NonfairSync实现的非公平获取的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
abstract static class Sync extends AbstractQueuedSynchronizer {
//非公平方式尝试获取
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//获取可用许可证
int available = getState();
//获取剩余许可证
int remaining = available - acquires;
//1.如果remaining小于0则直接返回remaining
//2.如果remaining大于0则先更新同步状态再返回remaining
if (remaining < 0 || compareAndSetState(available, remaining)) {
return remaining;
}
}
}
}

//非公平同步器
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {
super(permits);
}

//尝试获取许可证
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}

//公平同步器
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {
super(permits);
}

//尝试获取许可证
protected int tryAcquireShared(int acquires) {
for (;;) {
//判断同步队列前面有没有人排队
if (hasQueuedPredecessors()) {
//如果有的话就直接返回-1,表示尝试获取失败
return -1;
}
//获取可用许可证
int available = getState();
//获取剩余许可证
int remaining = available - acquires;
//1.如果remaining小于0则直接返回remaining
//2.如果remaining大于0则先更新同步状态再返回remaining
if (remaining < 0 || compareAndSetState(available, remaining)) {
return remaining;
}
}
}
}

NonfairSync的tryAcquireShared方法直接调用的是nonfairTryAcquireShared方法,这个方法是在父类Sync里面的。非公平获取锁的逻辑是先取出当前同步状态(同步状态表示许可证个数),将当前同步状态减去传入的参数,如果结果不小于0的话证明还有可用的许可证,那么就直接使用CAS操作更新同步状态的值,最后不管结果是否小于0都会返回该结果值。这里要了解tryAcquireShared方法返回值的含义,返回负数表示获取失败,零表示当前线程获取成功但后续线程不能再获取,正数表示当前线程获取成功并且后续线程也能够获取。再来看看acquireSharedInterruptibly方法的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//以可中断模式获取锁(共享模式)
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
//首先判断线程是否中断, 如果是则抛出异常
if (Thread.interrupted()) {
throw new InterruptedException();
}
//1.尝试去获取锁
//负数:表示获取失败
//零值:表示当前线程获取成功, 但是后继线程不能再获取了
//正数:表示当前线程获取成功, 并且后继线程同样可以获取成功
if (tryAcquireShared(arg) < 0) {
//2. 如果获取失败则进人该方法
doAcquireSharedInterruptibly(arg);
}
}

如果返回的remaining小于0的话就代表获取失败,因此tryAcquireShared(arg) < 0就为true,所以接下来就会调用doAcquireSharedInterruptibly方法,这个方法是AQS的方法,它会将当前线程包装成结点放入同步队列尾部,并且有可能挂起线程。这也是当remaining小于0时线程会排队阻塞的原因。而如果返回的remaining>=0的话就代表当前线程获取成功,因此tryAcquireShared(arg) < 0就为flase,所以就不会再去调用doAcquireSharedInterruptibly方法阻塞当前线程了。以上是非公平获取的整个逻辑,而公平获取时仅仅是在此之前先去调用hasQueuedPredecessors方法判断同步队列是否有人在排队,如果有的话就直接return -1表示获取失败,否则才继续执行下面和非公平获取一样的步骤。

释放许可

1
2
3
4
//释放一个许可证
public void release() {
sync.releaseShared(1);
}

调用release方法是释放一个许可证,它的操作很简单,就调用了AQS的releaseShared方法,下面来看看这个方法。

1
2
3
4
5
6
7
8
9
10
//释放锁的操作(共享模式)
public final boolean releaseShared(int arg) {
//1.尝试去释放锁
if (tryReleaseShared(arg)) {
//2.如果释放成功就唤醒其他线程
doReleaseShared();
return true;
}
return false;
}

AQS的releaseShared方法首先调用tryReleaseShared方法尝试释放锁,这个方法的实现逻辑在子类Sync里面。Semaphore重写了tryReleaseShared(),它的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 获取“可以获得的信号量的许可数”
int current = getState();
// 获取“释放releases个信号量许可之后,剩余的信号量许可数”
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 设置“可以获得的信号量的许可数”为next。
if (compareAndSetState(current, next))
return true;
}
}

tryReleaseShared方法里面采用for循环进行自旋,首先获取同步状态,将同步状态加上传入的参数,然后以CAS方式更新同步状态,更新成功就返回true并跳出方法,否则就继续循环直到成功为止。
如果tryReleaseShared()尝试释放共享锁失败,则会调用doReleaseShared()去释放共享锁。doReleaseShared()的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void doReleaseShared() {
for (;;) {
// 获取同步队列的头节点
Node h = head;
// 如果头节点不为null,并且头节点不等于tail节点。
if (h != null && h != tail) {
// 获取头节点对应的线程的状态
int ws = h.waitStatus;
// 如果头节点对应的线程是SIGNAL状态,则意味着“头节点的下一个节点所对应的线程”需要被unpark唤醒。
if (ws == Node.SIGNAL) {
// 设置“头节点对应的线程状态”为空状态。失败的话,则继续循环。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒“头节点的下一个节点所对应的线程”。
unparkSuccessor(h);
}
// 如果头节点对应的线程是空状态,则设置“文件点对应的线程所拥有的共享锁”为其它线程获取锁的空状态。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果头节点发生变化,则继续循环。否则,退出循环。
if (h == head) // loop if head changed
break;
}
}

doReleaseShared()会释放“共享锁”。它会从前往后的遍历同步队列,依次“唤醒”然后“执行”队列中每个节点对应的线程;最终的目的是让这些线程释放它们所持有的信号量。

应用示例

利用Semaphore实现数据库连接池。

连接池代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package main.java.com.study.lock.semaphore;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
* @author: whb
* @description: 连接池
*/
public class ConnectPool {
/**
* 连接池大小
*/
private int size;
/**
* 连接集合
*/
private Connect[] connects;
/**
* 连接状态标志
*/
private boolean[] connectFlag;
/**
* 剩余可用连接数
*/
private volatile int available;
/**
* 信号量
*/
private Semaphore semaphore;

public ConnectPool(int size) {
this.size = size;
this.available = size;
semaphore = new Semaphore(size, true);
connects = new Connect[size];
connectFlag = new boolean[size];
initConnects();
}

/**
* 初始化连接
*/
private void initConnects() {
//生成指定数量的连接
for (int i = 0; i < this.size; i++) {
connects[i] = new Connect();
}
}

/**
* 获取连接
*/
private synchronized Connect getConnect() {
for (int i = 0; i < connectFlag.length; i++) {
//遍历集合找到未使用的连接
if (!connectFlag[i]) {
//将连接设置为使用中
connectFlag[i] = true;
//可用连接数减一
available--;
System.out.println("【" + Thread.currentThread().getName() + "】已获取连接,剩余可用连接:" + available);
//返回连接
return connects[i];
}
}
return null;
}

/**
* 打开链接
*
* @return
*/
public Connect openConnect() throws InterruptedException {
//获取许可
semaphore.acquire();
//获取连接
return getConnect();
}

/**
* 释放连接
*/
public synchronized void release(Connect connect) {
for (int i = 0; i < this.size; i++) {
if (connect == connects[i]) {
//将连接设置为未使用
connectFlag[i] = false;
//可用连接加一
available++;
System.out.println("【" + Thread.currentThread().getName() + "】已释放连接,剩余可用连接:" + available);
//释放许可
semaphore.release();
}
}
}
}

class Connect {
private static int count = 1;
private int id = count++;

public Connect() {
//模拟打开一个连接很耗费资源,需要等待1秒
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("连接#" + id + "#已与数据库建立通道!");
}

@Override
public String toString() {
return "【" + id + "】";
}
}

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main.java.com.study.lock.semaphore;

/**
* @author: whb
* @description: 测试连接池
*/
public class TestConnectPool extends Thread {

private static ConnectPool pool = new ConnectPool(3);

@Override
public void run() {
try {
Connect connect = pool.openConnect();
Thread.sleep(100);
pool.release(connect);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new TestConnectPool().start();
}
}
}

执行结果:
img

本文标题:Java并发-Semaphore

文章作者:王洪博

发布时间:2019年03月01日 - 09:03

最后更新:2019年09月12日 - 10:09

原始链接:http://whb1990.github.io/posts/44092bd5.html

▄︻┻═┳一如果你喜欢这篇文章,请点击下方"打赏"按钮请我喝杯 ☕
0%