上一篇介绍了 哈希算法和一致性哈希算法的原理,我们知道哈希算法在分布式场景应用中存在着定位问题。所有有一致性哈希算法。
今天我们就动手实现以下哈希算法。

可选性 回顾一下 哈希算法和一致性哈希算法

# 说明

以下多次出现服务端节点,客户端节点这两个名字,含义如下:

服务端节点: 在实际场景中,比如分布式缓存,上一篇文章中的例子,服务端节点就是多个 Redis 机器。

客户端节点: 就是要缓存的数据,这里使用这两个名词来代表不同的两个部分。

# 实现功能

以上篇中提及的分布式缓存的例子为背景实现一致性哈希算法,主要实现两个功能:

  • 新增服务节点

    创建一个哈希环,然后可以存放服务端节点信息。后续新增节点也能正常存储。

  • 根据客户端节点找到对应的服务节点

    传入客户端的信息,我们可以根据 client 的信息或者其他的信息进行哈希运算,然后确定存储的服务端节点。

  • 实现根据服务端节点进行删除

    模拟实现部分服务端不可用。即例子中的缓存节点挂掉。

# 实现方案

首先我们需要定义一个接口规范,规定好要实现的内容,比如,保存服务端节点信息,即新增操作,根据客户端节点找到服务节点的功能,即查询操作。

# 排序 + List 的实现方式

主要思路如下: 将所有的节点保存到一个 List 中。然后对 List 进行排序,当获取服务端节点的时候,只需要找到第一个 哈希值比他大的服务端节点的就可以了。

不考虑排序的时间复杂度: 最优时间复杂度:O (1),第一个节点就是目标节点。最坏情况下:O (n+1), 招了一圈都没有找到。所以平均的时间复杂度是 O (N)。
那排序的时间复杂度呢?最快的就是 O (NlogN);

综合下来,这种实现方案的时间复杂度就是: O (NlogN)。

主要实现代码如下 (文末完整代码,注释颇多):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
public class SortListConsistentHash implements ConsistentHash {

/**
* hash环容器
*/
private List<NodeBucket> hashCircle = null;

/**
* 哈希算法
* 默认使用 {@link ConsistentHash#hash }算法实现
*/
private final HashHandle<Node> hashHandle;

public SortListConsistentHash() {
this.hashHandle = this::hash;
virtualNumber = 1;
}

public SortListConsistentHash(HashHandle<Node> hashHandle) {
this.hashHandle = hashHandle;
virtualNumber = 1;
}

public SortListConsistentHash(HashHandle<Node> hashHandle, Integer virtualNumber) {
this.hashHandle = hashHandle;
this.virtualNumber = virtualNumber;
}

public SortListConsistentHash(Integer virtualNumber) {
this.hashHandle = this::hash;
this.virtualNumber = virtualNumber;
}

/**
* 虚拟节点的数目,默认为1
*/
private final Integer virtualNumber;

/**
* 新增服务节点
*
* @param value 服务节点
*/
@Override
public void add(ServerNode value) {
if (hashCircle == null) {
hashCircle = new ArrayList<>();
}
for (int i = 0; i < virtualNumber; i++) {
value.setVirtualNodeId(i);
hashCircle.add(DefaultNodeBucket.of(value, (node) -> hashHandle.hash(value)));
}
}

/**
* 排序
*/
@Override
public void sort() {
hashCircle.sort(Comparator.comparingInt(NodeBucket::getHash));
}

/**
* 找不到符合条件的第一个节点
*
* @param clientNode 根据 客户端节点的Hash值获取到目标服务端节点
* @return
*/
@Override
public ServerNode getFirstNode(ClientNode clientNode) {

Integer hash = hash(clientNode);

Optional<NodeBucket> first = hashCircle.stream().filter(item -> item.getHash() > hash).findFirst();

return first.map(NodeBucket::getNode).orElse(hashCircle.get(0).getNode());

}

/**
* 获取所有服务节点
*
* @return 所有服务端节点
*/
@Override
public List<ServerNode> getAllServerNodes() {
return this.hashCircle.stream().map(NodeBucket::getNode).collect(Collectors.toList());
}

/**
* 哈希算法
*
* @param node 节点的信息,可能是客户端,也可能是服务端
* @return 哈希值
*/
@Override
public Integer hash(Node node) {
if (node instanceof ServerNode) {
return defaultStringHash(node.getIdentifier() + "#" + ((ServerNode) node).getVirtualNodeId());
} else {
return defaultStringHash(node.getIdentifier());
}
}

@Override
public void delete(Node node) {
if (node instanceof ServerNode) {
this.hashCircle.remove(DefaultNodeBucket.of((ServerNode) node, hashHandle));
}
}

}

# 线性表遍历的实现方式

首先说明一下,这里使用的是线性表遍历的方式,并没有指定说使用的数组还是链表,根据具体场景来选择吧,实现方式略有不同。我用数组的形式来实现。

上一种实现方式,使用了排序导致了时间复杂度为 O (NlogN)。那么我不用排序行不行?

可以的!我们首先将服务端节点保存到数组中,然后根据客户端哈希值和服务端节点哈希值的差,找出最小的那个节点就可以了。每次遍历实现的话,遍历一次就可以了,时间复杂度为 O (N).

主要实现逻辑代码如下(文末有完整实现代码,注释颇多):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
public class TraverseArrayConsistentHash implements ConsistentHash {

/**
* 使用数组实现的哈希环
*/
private NodeBucket[] hashCircle = new NodeBucket[]{};

private final HashHandle<Node> hashHandle;

private static final int defaultSize = 10;

private static final int MaxSize = Integer.MAX_VALUE;

private int length = 0;

private int size = 0;


/**
* 虚拟节点的数目,默认为1
*/
private final Integer virtualNumber;

public TraverseArrayConsistentHash(Integer initialCapacity) {
if (initialCapacity > 0) {
this.size = initialCapacity;
this.hashCircle = new NodeBucket[initialCapacity];
} else {
throw new IllegalArgumentException("initialCapacity must > 0 ");
}

this.hashHandle = this::hash;
virtualNumber = 1;
}

public TraverseArrayConsistentHash(Integer initialCapacity, Integer virtualNumber) {
if (initialCapacity > 0) {
this.size = initialCapacity;
this.hashCircle = new NodeBucket[initialCapacity];
} else {
throw new IllegalArgumentException("initialCapacity must > 0 ");
}

this.hashHandle = this::hash;
this.virtualNumber = virtualNumber;
}

public TraverseArrayConsistentHash(Integer initialCapacity, Integer virtualNumber, HashHandle<Node> hashHandle) {
if (initialCapacity > 0) {
this.size = initialCapacity;
this.hashCircle = new NodeBucket[initialCapacity];
} else {
throw new IllegalArgumentException("initialCapacity must > 0 ");
}

this.virtualNumber = virtualNumber;
this.hashHandle = hashHandle;
}

public TraverseArrayConsistentHash() {
this.hashHandle = this::hash;
this.virtualNumber = 1;
}

@Override
public void add(ServerNode serverNode) {
// 检查大小是否需要扩容
checkSize();
for (int i = 0; i < virtualNumber; i++) {
serverNode.setVirtualNodeId(i);
this.hashCircle[length++] = DefaultNodeBucket.of(serverNode, hashHandle);
}
}


/**
* 检查数组大小
*/
private void checkSize() {
if (size == 0) {
this.size = defaultSize;
this.hashCircle = new NodeBucket[size];
}
if (size >= MaxSize) {
throw new IllegalArgumentException();
}
if (length + 1 > size) {
this.size = size << 1;
this.hashCircle = Arrays.copyOf(this.hashCircle, size, NodeBucket[].class);
}
}

@Override
public Integer hash(Node node) {
if (node instanceof ServerNode) {
return defaultStringHash(node.getIdentifier() + "#" + ((ServerNode) node).getVirtualNodeId());
} else {
return defaultStringHash(node.getIdentifier());
}
}

@Override
public void delete(Node node) {
for (int index = 0; index < size; index++) {
if (node.equals(hashCircle[index].getNode())) {
fastRemove(index);
break;
}
}
}

private void fastRemove(int index) {
int numMoved = size - index - 1;
if (numMoved > 0)
System.arraycopy(hashCircle, index+1, hashCircle, index,
numMoved);
hashCircle[--size] = null;
}


@Override
public ServerNode getFirstNode(ClientNode clientNode) {
Integer hash = this.hashHandle.hash(clientNode);
ServerNode findNode = null;
int min = Integer.MAX_VALUE;
// 查找符合条件的服务端节点
for (int i = 0; i < length; i++) {
NodeBucket nodeBucket = this.hashCircle[i];
int difference;
if ((difference = Math.abs(nodeBucket.getHash() - hash)) < min) {
min = difference;
findNode = nodeBucket.getNode();
}
}
return findNode;
}

@Override
public List<ServerNode> getAllServerNodes() {
return Stream.of(this.hashCircle).map(NodeBucket::getNode).collect(Collectors.toList());
}

}

# SortedMap 实现方案

上面两种的实现方案,并非是最优的。根本原因就是数据结构的限制。 线性表决定了这一切。我们考虑换一种数据结构呢?

考虑下使用形结构

最快的树形数据结构,就是二叉平衡树了。二叉平衡树有两种 AVL 树和红黑树。

我们使用红黑树,因为红黑树的主要功能就是存储有序的数据,并且查询地效率是 O (logN)。

考虑到手写实现一个红黑树,着实有点复杂,这种我们使用 JDK 中的 TreeMap 来实现。

将所有的服务节点放到 TreeMap 中,这种结构天然支持排序的,所以我们只需要主要的实现找到服务节点的这个过程就好了。

首先计算出客户端的哈希值,查询出大于该哈希值的服务节点的子序列,如果子序列为空返回原来哈希环的第一个元素,否则,返回子序列的第一个元素即可。

主要实现代码如下 (文末有完整代码,注释真的多!)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
public class SortedMapConsistentHash implements ConsistentHash {

private final SortedMap<Integer, ServerNode> hashCircle = new TreeMap<>();

private final HashHandle<Node> hashHandle;

/**
* 虚拟节点的数目,默认为1
*/
private final Integer virtualNumber;

public SortedMapConsistentHash() {
virtualNumber = 1;
hashHandle = this::hash;
}

public SortedMapConsistentHash(int virtualNumber, HashHandle<Node> hashHandle) {
this.virtualNumber = 1;
this.hashHandle = hashHandle;
}

public SortedMapConsistentHash(int virtualNumber) {
this.virtualNumber = virtualNumber;
this.hashHandle = this::hash;
}

@Override
public void add(ServerNode serverNode) {
// 实现虚拟节点
for (int i = 0; i < virtualNumber; i++) {
serverNode.setVirtualNodeId(i);
hashCircle.put(this.hashHandle.hash(serverNode), serverNode);
}
}

@Override
public Integer hash(Node node) {
if (node instanceof ServerNode) {
return defaultStringHash(node.getIdentifier() + "#" + ((ServerNode) node).getVirtualNodeId());
} else {
return defaultStringHash(node.getIdentifier());
}
}

@Override
public void delete(Node node) {
this.hashCircle.remove(hashHandle.hash(node));
}

@Override
public ServerNode getFirstNode(ClientNode clientNode) {
int hash = hash(clientNode);
// 获取大于客户端哈希值的子序列
SortedMap<Integer, ServerNode> subMap = hashCircle.tailMap(hash);

if (subMap.isEmpty()) {
// 子序列为空,获取哈希环的第一个节点
Integer key = hashCircle.firstKey();
return hashCircle.get(key);
} else {
// 子序列不为空,获取子序列的第一个节点
Integer key = subMap.firstKey();
return subMap.get(key);
}
}

@Override
public List<ServerNode> getAllServerNodes() {
return new ArrayList<>(this.hashCircle.values());
}

@Override
public ServerNode process(ClientNode clientNode) {
return getFirstNode(clientNode);
}
}

````
最后,我们测试一下,一致性哈希算法的结果:

```java
public class ConsistentHashDemo {

public static void main(String[] args) {
testSortListConsistentHash();
testSortedMapConsistentHash();
testTraverseArrayConsistentHash();
}

/**
* 测试,使用遍历的方式来实现一致性hash算法
*/
private static void testTraverseArrayConsistentHash() {
ConsistentHash sortedMapConsistentHash = new TraverseArrayConsistentHash();
testConsistentHashCommonPart(sortedMapConsistentHash);
}

/**
* 测试,使用SortMap实现的一致性哈希算法
*/
private static void testSortedMapConsistentHash() {
ConsistentHash sortedMapConsistentHash = new SortedMapConsistentHash(3);
testConsistentHashCommonPart(sortedMapConsistentHash);

}

/**
* 测试,使用数组和排序方式实现的 一致性哈希算法
*/
private static void testSortListConsistentHash() {
SortListConsistentHash sortListConsistentHashMap = new SortListConsistentHash();
testConsistentHashCommonPart(sortListConsistentHashMap);
}


/**
* 测试 consistentHash 算法,公共代码部分
*
* @param consistentHash 一致性哈希实例
*/
private static void testConsistentHashCommonPart(ConsistentHash consistentHash) {
Stream.of(
new ServerNode("192.168.0.1", UUID.randomUUID().toString(), "domain1.com"),
new ServerNode("192.168.0.2", UUID.randomUUID().toString(), "domain2.com"),
new ServerNode("192.168.0.3", UUID.randomUUID().toString(), "domain3.com"),
new ServerNode("192.168.0.4", UUID.randomUUID().toString(), "domain4.com"),
new ServerNode("192.168.0.5", UUID.randomUUID().toString(), "domain5.com"),
new ServerNode("192.168.0.6", UUID.randomUUID().toString(), "domain6.com")
).forEach(consistentHash::add);
String s1 = UUID.randomUUID().toString();
ServerNode serverNode1 = consistentHash.process(new ClientNode(s1));
out.println(serverNode1);
ServerNode serverNode1_1 = consistentHash.process(new ClientNode(s1));
out.println(serverNode1_1);
ServerNode serverNode1_2 = consistentHash.process(new ClientNode(UUID.randomUUID().toString()));
out.println(serverNode1_2);
}
}

在测试完成之后,我还对各种方案就进行了性能相关的简单测试,硬件: windows 8 核 16G,i7.
测试代码就不贴出来了,查看原文去看吧。

总结如下:

在节点数 小于 10000 的场景下: 数组遍历 > SortedMap > 排序。 即:方案 2 > 方案 3 > 方案 1。
在服务端节点数 大于 10000 的场景下: SortedMap > 数组变量 > 排序。即: 方案 3 > 方案 2 > 方案 1.

# 附录

# 最后

期望与你一起遇见更好的自己

期望与你一起遇见更好的自己