分布式学习-一致性Hash算法

示例引入

比如现在有 N 个 Redis 服务器,不考虑使用Redis Cluster的情况下,那么如何将一个对象 object 映射到 N 个 Redis 上呢?

你很可能会采用类似下面的通用方法计算 object 的 hash 值,然后均匀的映射到到 N 个 cache:

求余算法: hash(object) % N

这种通用的计算方法有什么问题呢?

  1. 一个 redis 服务器 x 宕 掉了,这样所有映射到 redis x 的对象都会失效。怎么办?需要把 redis x 从 redis集群中移除,这时候 redis服务器 是 N-1 台,那么对应映射公式变成了 hash(object)%(N-1)

  2. 还有一种情况是由于访问量增多,需要添加 redis ,这时候 redis 是 N+1 台,映射公式变成了 hash(object)%(N+1)

1 和 2 意味着突然之间几乎所有的 redis缓存都失效了。对于服务器而言,这是一场灾难,所有的访问都会直接冲向后端服务器,对后端服务器造成压力,甚至是后端服务器瘫痪。
如何解决上面的问题呢?一致性hash算法就派上用场了。

一致性Hash算法

一致性Hash用于分布式缓存系统,将Key值映射到具体机器Ip上,并且增加和删除1台机器的数据移动量较小。

一致性Hash算法也是使用取模的方法,只是,刚才描述的取模法是对服务器的数量进行取模,而一致性Hash算法是对2^32取模。
简单来说,一致性Hash算法将整个哈希值空间组织成一个虚拟的圆环,如假设某哈希函数H的值空间为0-2^32-1(即哈希值是一个32位无符号整形),整个哈希环如下:

一致性Hash环

整个空间按顺时针方向组织,圆环的正上方的点代表0,0点右侧的第一个点代表1,以此类推,2、3、4、5、6……直到2^32-1,也就是说0点左侧的第一个点代表2^32-1, 0和2^32-1在零点中方向重合,把这个由2^32个点组成的圆环称为Hash环。

以上面的场景为例,假设有4台缓存服务器,node1、node2、node3,node4,那么,在生产环境中,这4台服务器肯定有自己的IP地址或主机名,用它们各自的IP地址或主机名作为关键字进行哈希计算,使用哈希后的结果对2^32取模,可以使用如下公式:

hash(node1的IP地址) % 2^32

通过上面公式算出的结果一定能得到一个0到2^32-1之间的整数,就用得出的整数,代表服务器node1,既然这个整数肯定处于0到2^32-1之间,那么,上图中的hash环上必定有一个点与这个整数对应,也就是服务器node1,那么,服务器node1就可以映射到这个环上了,其他服务器依次类推即可得到相应的位置,如下图:
一致性Hash节点位置示例

接下来使用如下算法定位数据访问到相应服务器: 将数据key使用相同的函数Hash计算出哈希值,并确定此数据在环上的位置,从此位置沿环顺时针“行走”,遇到的第一台服务器就是该数据应该映射到的服务器。
例如我们有Object A、Object B、Object C、Object D四个数据对象,经过哈希计算后,在环空间上的位置如下:
一致性Hash数据存储示例

根据一致性哈希算法,数据A会被定为到Node A上,B被定为到Node B上,C被定为到Node C上,D被定为到Node D上。

一致性hash算法的容错性和扩展性

现在假设Node C不幸宕机,可以看到此时对象A、B、D不会受到影响,只有C对象被重定位到Node D。

一般的,在一致性哈希算法中,如果一台服务器不可用,则受影响的数据仅仅是此服务器到其环空间中前一台服务器(即沿着逆时针方向行走遇到的第一台服务器)之间数据,其它不会受到影响。如下图所示:
减少一个节点

下面考虑另外一种情况,如果在系统中增加一台服务器Node X,如下图所示:
增加一个节点

此时对象Object A、B、D不受影响,只有对象C需要重定位到新的Node X 。

一般的,在一致性哈希算法中,如果增加一台服务器,则受影响的数据仅仅是新服务器到其环空间中前一台服务器(即沿着逆时针方向行走遇到的第一台服务器)之间数据,其它数据也不会受到影响。

综上所述,一致性哈希算法对于节点的增减都只需重定位环空间中的一小部分数据,具有较好的容错性和可扩展性。

Hash环的数据倾斜问题

一致性Hash算法在服务节点太少时,容易因为节点分部不均匀而造成数据倾斜(被缓存的对象大部分集中缓存在某一台服务器上)问题,例如系统中只有两台服务器,其环分布如下:
服务节点过少示例

此时必然造成大量数据集中到Node A上,而只有极少量会定位到Node B上。为了解决这种数据倾斜问题,一致性Hash算法引入了虚拟节点机制,即对每一个服务节点计算多个哈希,每个计算结果位置都放置一个此服务节点,称为虚拟节点。具体做法可以在服务器IP或主机名的后面增加编号来实现。

例如上面的情况,可以为每台服务器计算三个虚拟节点,于是可以分别计算 “Node A#1”、“Node A#2”、“Node A#3”、“Node B#1”、“Node B#2”、“Node B#3”的哈希值,于是形成六个虚拟节点:
引入虚拟节点

同时数据定位算法不变,只是多了一步虚拟节点到实际节点的映射,例如定位到“Node A#1”、“Node A#2”、“Node A#3”三个虚拟节点的数据均定位到Node A上。这样就解决了服务节点少时数据倾斜的问题。在实际应用中,通常将虚拟节点数设置为32甚至更大,因此即使很少的服务节点也能做到相对均匀的数据分布。

一致性Hash算法的Java实现

HashUtils工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.springboot.whb.study.consistentHashing;

/**
* @author: whb
* @description: Hash工具类
*/
public class HashUtils {
/**
* 使用FNV1_32_HASH算法计算服务器的Hash值
*
* @param str
* @return
*/
public static int getHash(String str) {
final int p = 16777619;
int hash = (int) 2166136261L;
for (int i = 0; i < str.length(); i++) {
hash = (hash ^ str.charAt(i)) * p;
}
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;

// 如果算出来的值为负数则取其绝对值
if (hash < 0) {
hash = Math.abs(hash);
}
return hash;
}
}

不带虚拟节点

具体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.springboot.whb.study.consistentHashing;

import java.util.SortedMap;
import java.util.TreeMap;

/**
* @author: whb
* @description: 一致性Hash算法--不带虚拟节点的实现
*/
public class ConsistentHashingWithoutVirtualNode {
/**
* 待添加到Hash环的服务器
*/
private static String[] servers = {"127.0.0.1:7000", "127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003", "127.0.0.1:7004"};
/**
* key表示服务器的Hash值,value表示服务器
*/
private static SortedMap<Integer, String> sortedMap = new TreeMap<>();

/**
* 初始化,将所有服务器放入sortedMap
*/
static {
for (int i = 0; i < servers.length; i++) {
int hash = HashUtils.getHash(servers[i]);
System.out.println("[" + servers[i] + "]加入集合中, 其Hash值为:" + hash);
sortedMap.put(hash, servers[i]);
}
System.out.println();
}

/**
* 得到应当路由到的结点
*
* @param key
* @return
*/
private static String getServer(String key) {
//得到该key的hash值
int hash = HashUtils.getHash(key);
//得到大于该Hash值的所有Map
SortedMap<Integer, String> subMap = sortedMap.tailMap(hash);
if (subMap.isEmpty()) {
//如果没有比该key的hash值大的,则从第一个node开始
Integer i = sortedMap.firstKey();
//返回对应的服务器
return sortedMap.get(i);
} else {
//第一个Key就是顺时针过去离node最近的那个结点
Integer i = subMap.firstKey();
//返回对应的服务器
return subMap.get(i);
}
}

public static void main(String[] args) {
String[] keys = {"斗帝", "斗圣", "斗尊", "斗宗", "斗皇", "斗王", "斗灵", "大斗师", "斗师", "斗者"};
for (String key : keys) {
System.out.println("[" + key + "]的hash值为:" + HashUtils.getHash(key)
+ ", 被路由到结点[" + getServer(key) + "]");
}
}
}

测试结果

不带虚拟节点的测试结果

带虚拟节点

具体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package com.springboot.whb.study.consistentHashing;

import com.springboot.whb.study.common.util.StringUtils;

import java.util.LinkedList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;

/**
* @author: whb
* @description: 一致性Hash算法--带虚拟节点的实现
*/
public class ConsistentHashingWithVirtualNode {
/**
* 待添加到Hash环的服务器
*/
private static String[] servers = {"127.0.0.1:7000", "127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003", "127.0.0.1:7004"};

/**
* 真实节点列表,考虑到服务器的上线、下线,即添加、删除的场景会比较频繁
*/
private static List<String> realNodes = new LinkedList<>();

/**
* 虚拟节点,Key表示虚拟节点的Hash值,value表示虚拟节点的名称
*/
private static SortedMap<Integer, String> virtualNodes = new TreeMap<>();

/**
* 虚拟节点的数目
*/
private static final int VIRTUAL_NODES = 5;

/**
* 初始化,加入真实节点及虚拟节点
*/
static {
//先将原始的服务器添加到真实节点列表
for (int i = 0; i < servers.length; i++) {
realNodes.add(servers[i]);
}
//添加虚拟节点,遍历真实节点列表
for (String str : realNodes) {
for (int i = 0; i < VIRTUAL_NODES; i++) {
String virtualNodeName = str + "&&VN" + String.valueOf(i);
setServer(virtualNodeName);
}
}
System.out.println();
}

/**
* 添加虚拟节点
*
* @param virtualNodeName
*/
private static void setServer(String virtualNodeName) {
setServer(virtualNodeName, null);
}

/**
* 添加虚拟节点
*
* @param virtualNodeName
* @param hash
*/
private static void setServer(String virtualNodeName, Integer hash) {
hash = hash != null ? HashUtils.getHash(hash.toString()) : HashUtils.getHash(virtualNodeName);
if (StringUtils.isBlank(virtualNodes.get(hash))) {
virtualNodes.put(hash, virtualNodeName);
System.out.println("虚拟节点[" + virtualNodeName + "]被添加, hash值为:" + hash);
} else {
//解决Hash碰撞
setServer(virtualNodeName, hash);
}
}

/**
* 得到应当路由到的结点
*
* @param key
* @return
*/
private static String getServer(String key) {
//得到该key的hash值
int hash = HashUtils.getHash(key);
// 得到大于该Hash值的所有Map
SortedMap<Integer, String> subMap = virtualNodes.tailMap(hash);
String virtualNode;
if (subMap.isEmpty()) {
//如果没有比该key的hash值大的,则从第一个node开始
Integer i = virtualNodes.firstKey();
//返回对应的服务器
virtualNode = virtualNodes.get(i);
} else {
//第一个Key就是顺时针过去离node最近的那个结点
Integer i = subMap.firstKey();
//返回对应的服务器
virtualNode = subMap.get(i);
}
return virtualNode;
}

public static void main(String[] args) {
String[] keys = {"斗帝", "斗圣", "斗尊", "斗宗", "斗皇", "斗王", "斗灵", "大斗师", "斗师", "斗者"};
for (String key : keys) {
String server = getServer(key);
String realServer = server.split("&&VN")[0];
System.out.println("[" + key + "]的hash值为" +
HashUtils.getHash(key) + ", 被路由到虚拟结点[" + server + "],被路由到真实节点[" + realServer + "]");
}
}
}

测试结果

带虚拟节点的测试结果

本文标题:分布式学习-一致性Hash算法

文章作者:王洪博

发布时间:2019年05月06日 - 09:05

最后更新:2019年09月12日 - 10:09

原始链接:http://whb1990.github.io/posts/302399c9.html

▄︻┻═┳一如果你喜欢这篇文章,请点击下方"打赏"按钮请我喝杯 ☕
0%