服务注册发现:Eureka源码剖析(11)核心机制之Eureka Server集群机制
- 什么是eureka集群?
一组互相复制数据,完全对等的eureka节点。通过配置文件指定,也可以热刷新(扩容缩容)
集群机制的实现?
在eureka server启动的时候,初始化服务器上下文阶段,会启动eureka集群信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20class: DefaultEurekaServerContext
public void initialize() {
logger.info("Initializing ...");
// 启动eureka集群,这个集群是每个eureka server节点都维护的一组内存对象。
// 他的作用是每当本节点发生变更,比如有服务实例注册、下线、心跳续约等事件发生时,会修改这组对象
// 然后这组对象负责将修改同步到对应的eureka server节点上去。
peerEurekaNodes.start();
try {
// 初始化服务注册表
// 这里其实是将所有的eureka server节点上的注册表都抓取过来,合并一下
// 然后存放到自己的本地注册表中去
registry.init(peerEurekaNodes);
} catch (Exception e) {
throw new RuntimeException(e);
}
logger.info("Initialized");
}- 启动eureka集群,这个集群是每个eureka server节点都维护的一组内存对象。
- 他的作用是每当本节点发生变更,比如有服务实例注册、下线、心跳续约等事件发生时,会修改这组对象
- 然后这组对象负责将修改同步到对应的eureka server节点上去。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46class: PeerEurekaNodes
public void start() {
// 搞了一个单线程的线程池 用来跑eureka集群节点的更新任务
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
// 这里是更新自己内存中的eureka集群,
// 对新的eureka节点进行创建实例,然后添加到集群进去
// 对已经无效的eureka 节点清除出这个集群中
// 这里被Spring Cloud Netflix Eureka拿来做eureka集群的在线扩缩容
// 更新eureka server集群信息
updatePeerEurekaNodes(resolvePeerUrls());
Runnable peersUpdateTask = new Runnable() {
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: {}", node.getServiceUrl());
}
}- 创建一个线程池
- 解析配置文件中配置的eureka server地址列表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47class: PeerEurekaNodes
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
// 这里toShutdown就是需要缩容的eureka server节点
// 实现很简单,就是以最新的服务地址列表为准,除此之外的都需要干掉
Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
toShutdown.removeAll(newPeerUrls);
// 这里toAdd就是需要扩容的eureka server节点
// 实现很简单,就是拿新增的服务地址列表
Set<String> toAdd = new HashSet<>(newPeerUrls);
toAdd.removeAll(peerEurekaNodeUrls);
// Remove peers no long available
List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
// 这里处理缩容的逻辑
// 遍历现在的集群里的节点,删除缩容的节点,并且关停缩容的节点
// 关停缩容的节点 其实没啥特别的 就是释放一些线程池资源而已
if (!toShutdown.isEmpty()) {
logger.info("Removing no longer available peer nodes {}", toShutdown);
int i = 0;
while (i < newNodeList.size()) {
PeerEurekaNode eurekaNode = newNodeList.get(i);
if (toShutdown.contains(eurekaNode.getServiceUrl())) {
newNodeList.remove(i);
eurekaNode.shutDown();
} else {
i++;
}
}
}
// 这里处理扩容的逻辑
// 遍历需要扩容的节点,创建之,加入集群
// Add new peers
if (!toAdd.isEmpty()) {
logger.info("Adding new peer nodes {}", toAdd);
for (String peerUrl : toAdd) {
newNodeList.add(createPeerEurekaNode(peerUrl));
}
}
this.peerEurekaNodes = newNodeList;
this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}集群的扩容和缩容
扩容
- 根据最新的eureka server地址列表
- 与老的服务列表做比对,算出新增的服务地址,这里就是需要扩容的节点
- 创建节点,加入集群
缩容
- 根据最新的eureka server地址列表
- 与老的服务列表做比对,算出待删除的服务地址,这里就是需要缩容的节点
- 从集群中剔除节点,释放节点的线程池资源
集群同步机制怎么实现的?
每当服务注册、下线、取消、状态变更时,都会执行PeerAwareInstanceRegistryImpl的replicateToPeers的方法,这个方法会将这些动作同步到内存集群的队列中,然后有定时任务,会分派这些动作,将这些动作打成一批批的包,分批请求对应的eureka server。
在内存集群中,复制任务的接收、分派、消费这套流程采用了三层队列来实现,下面我们分析一下这个机制
每个内存集群中的节点都有两个任务分派器TaskDispatcher
每个TaskDispatcher有两个执行器组成
- 一个acceptorExecutor,负责接收任务
- 如果重新处理队列、接收队列、待处理任务任意一个不为空,就执行如下逻辑;否则就睡10秒钟
- 任务先进接收队列(acceptorQueue)
- 有个叫AcceptorRunner的后台线程不断的消费接收队列
- 不断拉取重新处理队列(reprocessQueue)
- 如果不为空,将里面的任务放到待执行任务(pendingTasks)中去; 插入处理顺序队列(processingOrder)
- 如果满了,累加重新处理队列已经清除的任务个数;清空重新处理队列
- 不断拉取接收队列
- 如果待执行任务满了,处理顺序队列里删除这个任务
- 如果没满,就放入待执行任务中去
- 如果刚才放入待执行任务中的那个任务是新任务,就将这个任务加入处理顺序队列中去
- 不断拉取重新处理队列(reprocessQueue)
- 分派批处理任务
- 积攒到一批或者待执行任务中最老的任务已经延时超过500毫秒,就将待执行任务任务打成一批,放到批处理任务工作队列里
- 分派单项处理任务
- 如果重新处理队列、接收队列、待处理任务任意一个不为空,就执行如下逻辑;否则就睡10秒钟
- 一个taskExecutor,负责执行任务
- 这里会从批处理任务队列里,一批批的消费,将一批任务一次请求发送到对应的eureka server节点
- 一个acceptorExecutor,负责接收任务
总结