分布式学习-分布式锁

前言

多线程情况下对共享资源的操作需要加锁,避免数据被写乱,在分布式系统中,这个问题也是存在的,此时就需要一个分布式锁服务。

分布式锁的特点

分布式锁一般要有以下特点:

  • 排他性:任意时刻,只能有一个client能获取到锁;
  • 容错性:分布式锁服务一般要满足AP,也就是说,只要分布式锁服务集群节点大部分存活,client就可以进行加锁解锁操作;
  • 避免死锁:分布式锁一定能得到释放,即使client在释放之前崩溃或者网络不可达而没有主动解锁,也能保证后续client能加锁。
    除了以上特点之外,分布式锁最好也能满足可重入、高性能、阻塞锁特性(AQS这种,能够及时从阻塞状态唤醒)等。

分布式锁方案

分布式锁一般有三种实现方式:

  1. 基于数据库的分布式锁;
  2. 基于Redis的分布式锁;
  3. 基于ZooKeeper的分布式锁

基于数据库的分布式锁

在数据库新建一张表用于控制并发控制,表结构可以如下所示:

1
2
3
4
5
6
7
8
CREATE TABLE `lock_table` (
`id` int(11) unsigned NOT NULL COMMENT '主键',
`key_id` bigint(20) NOT NULL COMMENT '分布式key',
`memo` varchar(50) NOT NULL DEFAULT '' COMMENT '可记录操作内容',
`update_time` datetime NOT NULL COMMENT '更新时间',
PRIMARY kEY (`id`,'key_id'),
UNIQUE KEY `key_id` (`key_id`) USING BETREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8

key_id作为分布式key用来并发控制,memo可用来记录一些操作内容(比如memo可用来支持重入特性,标记下当前加锁的client和加锁次数)。

key_id设置为唯一索引,保证了针对同一个key_id只有一个加锁(数据插入)能成功。此时lock和unlock伪代码如下:

1
2
3
4
5
6
7
8
9
def lock : 
exec sql: insert into lock_table(key_id, memo, update_time) values (key_id, memo, NOW())
if result == true :
return true
else :
return false

def unlock :
exec sql: delete from lock_table where key_id = 'key_id' and momo = 'memo'

注意,伪代码中的lock操作是非阻塞锁,也就是tryLock,如果想实现阻塞(或者阻塞超时)加锁,只需要反复执行lock伪代码直到加锁成功为止即可。

基于DB的分布式锁其实有一个问题,那就是如果加锁成功后,client端宕机或者由于网络原因导致没有解锁,那么其他client就无法对该key_id进行加锁并且无法释放了。为了能够让锁失效,需要在应用层加上定时任务,去删除过期还未解锁的记录,比如删除2分钟前未解锁的伪代码如下:

1
2
def clear_timeout_lock : 
exec sql: delete from lock_table where update_time < ADDTIME(NOW(), '-00:02:00')

因为单实例DB的TPS一般为几百,所以基于DB的分布式性能上限一般也是1k以下,一般在并发量不大的场景下该分布式锁是满足需求的,不会出现性能问题。

不过DB作为分布式锁服务需要考虑单点问题,对于分布式系统来说是不允许出现单点的,一般通过数据库的同步复制,以及使用vip切换Master就能解决这个问题。

以上DB分布式锁是通过insert来实现的,如果加锁的数据已经在数据库中存在,那么用select xxx where key_id = xxx for udpate方式来做也是可以的。

缺点

1、这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
2、这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
3、这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
4、这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中库数据已经存在了。

缺陷解决方案

  1. 数据库是单点?
    搞两个数据库,数据实现双向同步。一旦挂掉快速切换到备库上。

  2. 没有失效时间?
    只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。

  3. 非阻塞的?
    搞一个while循环,直到insert成功再返回成功。

  4. 非重入的?
    在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了。

基于Redis的分布式锁

原理

基于redis的锁是通过以下命令对资源进行加锁:

1
set key_id key_value NX PX expireTime

其中,set nx命令只会在key不存在时给key进行赋值,px用来设置key过期时间,key_value一般是随机值,用来保证释放锁的安全性(释放时会判断是否是之前设置过的随机值,只有是才释放锁)。由于资源设置了过期时间,一定时间后锁会自动释放。

set nx保证并发加锁时只有一个client能设置成功(redis内部是单线程,并且数据存在内存中,也就是说redis内部执行命令是不会有多线程同步问题的)。

Java实现

maven依赖

首先通过Maven引入Jedis开源组件,在pom.xml文件加入下面的代码:

1
2
3
4
5
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>

加锁代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class RedisLock {

private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";

/**
* 尝试获取分布式锁
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @param expireTime 超期时间
* @return 是否获取成功
*/
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {

String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
}

从上面的代码可以看到,加锁的核心代码只有一行:

1
jedis.set(String key, String value, String nxxx, String expx, int time)

这个set()方法一共有五个形参:

  1. key,使用key来当锁,因为key是唯一的。

  2. value,传的是requestId,为什么有key作为锁了还要用到value?

    原因是要保证锁的可靠性,分布式锁还要满足解铃还须系铃人,通过给value赋值为requestId,就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。

    requestId可以使用UUID.randomUUID().toString()方法生成。

  3. nxxx,这个参数是NX,意思是SET IF NOT EXIST,即当key不存在时,进行set操作;若key已经存在,则不做任何操作;

  4. expx,这个参数PX,意思是要给这个key加一个过期的设置,具体时间由第五个参数决定。

  5. time,与第四个参数相呼应,代表key的过期时间。

总的来说,执行上面的set()方法就只会导致两种结果:

  • 当前没有锁(key不存在),那么就进行加锁操作,并对锁设置个有效期,同时value表示加锁的客户端。
  • 已有锁存在,不做任何操作。

上面的加锁代码已经满足了分布式锁的排他性和避免死锁的特点。下面列几个错误示例:

错误示例1

比较常见的错误示例就是使用jedis.setnx()jedis.expire()组合实现加锁,代码如下:

1
2
3
4
5
6
7
public static void wrongGetLock1(Jedis jedis, String lockKey, String requestId, int expireTime) {
Long result = jedis.setnx(lockKey, requestId);
if (result == 1) {
// 若在这里程序突然崩溃,则无法设置过期时间,将发生死锁
jedis.expire(lockKey, expireTime);
}
}

乍一看好像和前面的set()方法结果一样,然而由于这是两条Redis命令,不具有原子性,如果程序在执行完setnx()之后突然崩溃,导致锁没有设置过期时间。那么将会发生死锁。

网上之所以有人这样实现,是因为低版本的jedis并不支持多参数的set()方法。

错误示例2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static boolean wrongGetLock2(Jedis jedis, String lockKey, int expireTime) {
long expires = System.currentTimeMillis() + expireTime;
String expiresStr = String.valueOf(expires);

// 如果当前锁不存在,返回加锁成功
if (jedis.setnx(lockKey, expiresStr) == 1) {
return true;
}

// 如果锁存在,获取锁的过期时间
String currentValueStr = jedis.get(lockKey);
if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
// 锁已过期,获取上一个锁的过期时间,并设置现在锁的过期时间
String oldValueStr = jedis.getSet(lockKey, expiresStr);
if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
// 考虑多线程并发的情况,只有一个线程的设置值和当前值相同,它才有权利加锁
return true;
}
}
// 其他情况,一律返回加锁失败
return false;
}

这一种错误示例就比较难以发现问题,执行过程如下:

  • 通过setnx()方法尝试加锁,如果当前锁不存在,返回加锁成功。

  • 如果锁已经存在则获取锁的过期时间,和当前时间比较,如果锁已经过期,则设置新的过期时间,返回加锁成功。

那么这段代码问题在哪里?

  1. 由于是客户端自己生成过期时间,所以需要强制要求分布式下每个客户端的时间必须同步。
  1. 当锁过期的时候,如果多个客户端同时执行jedis.getSet()方法,那么虽然最终只有一个客户端可以加锁,但这个客户端的锁过期时间可能被其他客户端覆盖。
  1. 锁不具备拥有者标识,即任何客户端都可以解锁。

解锁代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class RedisLock {
private static final Long RELEASE_SUCCESS = 1L;

/**
* 释放分布式锁
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @return 是否释放成功
*/
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));

if (RELEASE_SUCCESS.equals(result)) {
return true;
}
return false;
}
}

可以看到,解锁只需要两行代码就搞定了!

第一行代码,写了一个简单的Lua脚本代码。

第二行代码,将Lua代码传到jedis.eval()方法里,并使参数KEYS[1]赋值为lockKeyARGV[1]赋值为requestIdeval()方法是将Lua代码交给Redis服务端执行。

💛 那为什么要用Lua脚本呢?

因为要确保上述操作是原子性的。首先获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)。

那为什么执行eval()方法可以确保原子性,源于Redis的特性,下面是官网对eval命令的部分解释:

简单来说,就是在eval命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,并且直到eval命令执行完成,Redis才会执行其他命令。

另附加锁的Lua脚本

1
String script = "if redis.call('setNx',KEYS[1],ARGV[1]) then if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end  end";
错误示例1
1
2
3
public static void wrongReleaseLock1(Jedis jedis, String lockKey) {
jedis.del(lockKey);
}

这种不先判断锁的拥有者而直接解锁的方式,会导致任何客户端都可以随时进行解锁,即使这把锁不是它的。

错误示例2
1
2
3
4
5
6
7
public static void wrongReleaseLock2(Jedis jedis, String lockKey, String requestId) {
// 判断加锁与解锁是不是同一个客户端
if (requestId.equals(jedis.get(lockKey))) {
// 若在此时,这把锁突然不是这个客户端的,则会误解锁
jedis.del(lockKey);
}
}

如代码注释,问题在于如果调用jedis.del()方法的时候,这把锁已经不属于当前客户端的时候会解除他人加的锁。

那么是否真的有这种场景?

答案是肯定的,比如客户端A加锁,一段时间之后客户端A解锁,在执行jedis.del()之前,锁突然过期了,此时客户端B尝试加锁成功,然后客户端A再执行del()方法,则将客户端B的锁给解除了。

基于ZooKeeper的分布式锁

Zookeeper重要的3个特征是:zab协议、node存储模型和watcher机制。

通过zab协议保证数据一致性,Zookeeper集群部署保证可用性,node存储在内存中,提高了数据操作性能,使用watcher机制,实现了通知机制(比如加锁成功的client释放锁时可以通知到其他client)。

Zookeeper node模型支持临时节点特性,即client写入的数据是临时数据,当客户端宕机时临时数据会被删除,这样就不需要给锁增加超时释放机制了。

当针对同一个path并发多个创建请求时,只有一个client能创建成功,这个特性用来实现分布式锁。注意:如果client端没有宕机,由于网络原因导致Zookeeper服务与client心跳失败,那么Zookeeper也会把临时数据给删除掉的,这时如果client还在操作共享数据,是有一定风险的。

基于Zookeeper实现分布式锁,相对于基于Redis和DB的实现来说,使用上更容易,效率与稳定性较好。curator封装了对Zookeeper的API操作,同时也封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等,使用curator进行分布式加锁示例如下:

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.itstyle.seckill.distributedlock.zookeeper;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
* 基于curator的zookeeper分布式锁
* 这里我们开启5个线程,每个线程获取锁的最大等待时间为5秒,为了模拟具体业务场景,方法中设置4秒等待时间。
* 开始执行main方法,通过ZooInspector监控/curator/lock下的节点如下图:
*/
public class CuratorUtil {
private static String address = "127.0.0.1:2181";

public static void main(String[] args) {
//1、重试策略:初试时间为1s 重试3次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//2、通过工厂创建连接
CuratorFramework client = CuratorFrameworkFactory.newClient(address, retryPolicy);
//3、开启连接
client.start();
//4 分布式锁
final InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");
//读写锁
//InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, "/readwriter");

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);

for (int i = 0; i < 5; i++) {
fixedThreadPool.submit(new Runnable() {
@Override
public void run() {
boolean flag = false;
try {
//尝试获取锁,最多等待5秒
flag = mutex.acquire(5, TimeUnit.SECONDS);
Thread currentThread = Thread.currentThread();
if (flag) {
System.out.println("线程" + currentThread.getId() + "获取锁成功");
} else {
System.out.println("线程" + currentThread.getId() + "获取锁失败");
}
//模拟业务逻辑,延时4秒
Thread.sleep(4000);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (flag) {
try {
mutex.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
});
}
}
}

下面再贴一下使用原生Zookeeper实现的分布式锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package com.springboot.whb.study.distributedlock.zookeeper;

import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

import java.util.concurrent.CountDownLatch;

/**
* @author: whb
* @description: 原生Zookeeper实现分布式锁
*/
@Slf4j
public class ZooKeeperSession {

private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

private ZooKeeper zookeeper;

public ZooKeeperSession() {
// 连接zookeeper server,是异步创建会话的,通过一个监听器+CountDownLatch,来确认真正建立了zk server的连接
try {
this.zookeeper = new ZooKeeper(
"localhost:2181",
50000,
new ZooKeeperWatcher());

//打印即使状态:验证其是不是异步的?
log.info(String.valueOf(zookeeper.getState()));

try {
// CountDownLatch:简而言之 初始化——非0;非0——等待;0——往下执行
connectedSemaphore.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("ZooKeeper session established......");
} catch (Exception e) {
log.error("连接zookeeper失败:{}", e);
}
}

/**
* 初始化实例
*/
public static void init() {
getInstance();
}

/**
* 建立zk session的watcher:
*/
private class ZooKeeperWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
connectedSemaphore.countDown();
}
}
}

/**
* 静态内部类实现单例
*/
private static class Singleton {

private static ZooKeeperSession instance;

static {
instance = new ZooKeeperSession();
}

public static ZooKeeperSession getInstance() {
return instance;
}
}

/**
* 获取单例
*
* @return
*/
public static ZooKeeperSession getInstance() {
return Singleton.getInstance();
}

/**
* 重试获取分布式锁
*
* @param adId
*/
public void acquireDistributedLock(Long adId) {
String path = "/ad-lock-" + adId;
try {
zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
log.info("success to acquire lock for adId = " + adId);
} catch (Exception e) {
// 如果锁node已经存在了,就是已经被别人加锁了,那么就这里就会报错
// NodeExistsException
int count = 0;
while (true) {
try {
Thread.sleep(1000);
zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (Exception e2) {
count++;
log.info("the " + count + " times try to acquire lock for adId = " + adId);
continue;
}
log.info("success to acquire lock for adId = " + adId + " after " + count + " times try......");
break;
}
}
}

/**
* 释放掉分布式锁
*
* @param adId
*/
public void releaseDistributedLock(Long adId) {
String path = "/ad-lock-" + adId;
try {
zookeeper.delete(path, -1);
log.info("release the lock for adId = " + adId);
} catch (Exception e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
Long adId = 1L;
ZooKeeperSession zkSession = ZooKeeperSession.getInstance();
//1、获取锁
zkSession.acquireDistributedLock(adId);
//锁没有被释放,再次获取将被阻塞,直到获取成功
//zkSession.acquireDistributedLock(adId);
//2、执行一些修改共享资源的操作
log.info("正在执行一些修改共享资源的操作!");

//3、释放锁
zkSession.releaseDistributedLock(adId);
}
}

总结

从上面介绍的3种分布式锁的设计与实现中,可以看出每种实现都有各自的特点,针对潜在的问题有不同的解决方案,归纳如下:

性能:Redis > Zookeeper > DB

避免死锁:DB通过应用层设置定时任务来删除过期还未释放的锁,Redis通过设置超时时间来解决,而Zookeeper是通过临时节点来解决。

可用性:DB可通过数据库同步复制,vip切换master来解决;Redis可通过集群或者master-slave方式来解决;Zookeeper本身自己是通过zab协议集群部署来解决的。注意,DB和Redis的复制一般都是异步的,也就是说某些时刻分布式锁发生故障可能存在数据不一致问题,而Zookeeper本身通过zab协议保证集群内(至少n/2+1个)节点数据一致性。

锁唤醒:DB和Redis分布式锁一般不支持唤醒机制(也可以通过应用层自己做轮询检测锁是否空闲,空闲就唤醒内部加锁线程),Zookeeper可通过本身的watcher/notify机制来做。

使用分布式锁,安全性上和多线程(同一个进程内)加锁是没法比的,可能由于网络原因,分布式锁服务(因为超时或者认为client挂了)将加锁资源给删除了,如果client端继续操作共享资源,此时是有隐患的。

因此,对于分布式锁,一个是要尽量提高分布式锁服务的可用性,另一个就是要部署同一内网,尽量降低网络问题发生几率。

这样来看,貌似分布式锁服务不是“完美”的,那么该如何选择分布式锁呢?最好是结合自己的业务实际场景,来选择不同的分布式锁实现,一般来说,基于Redis的分布式锁服务应用较多。


本文标题:分布式学习-分布式锁

文章作者:王洪博

发布时间:2019年08月24日 - 11:08

最后更新:2019年11月14日 - 08:11

原始链接:http://whb1990.github.io/posts/12dd52e3.html

▄︻┻═┳一如果你喜欢这篇文章,请点击下方"打赏"按钮请我喝杯 ☕
0%