• 什么是eureka集群?
    一组互相复制数据,完全对等的eureka节点。通过配置文件指定,也可以热刷新(扩容缩容)
  • 集群机制的实现?

    在eureka server启动的时候,初始化服务器上下文阶段,会启动eureka集群信息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    class: DefaultEurekaServerContext

    @PostConstruct
    @Override
    public void initialize() {
    logger.info("Initializing ...");
    // 启动eureka集群,这个集群是每个eureka server节点都维护的一组内存对象。
    // 他的作用是每当本节点发生变更,比如有服务实例注册、下线、心跳续约等事件发生时,会修改这组对象
    // 然后这组对象负责将修改同步到对应的eureka server节点上去。
    peerEurekaNodes.start();
    try {
    // 初始化服务注册表
    // 这里其实是将所有的eureka server节点上的注册表都抓取过来,合并一下
    // 然后存放到自己的本地注册表中去
    registry.init(peerEurekaNodes);
    } catch (Exception e) {
    throw new RuntimeException(e);
    }
    logger.info("Initialized");
    }
    • 启动eureka集群,这个集群是每个eureka server节点都维护的一组内存对象。
    • 他的作用是每当本节点发生变更,比如有服务实例注册、下线、心跳续约等事件发生时,会修改这组对象
    • 然后这组对象负责将修改同步到对应的eureka server节点上去。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    class: PeerEurekaNodes

    public void start() {
    // 搞了一个单线程的线程池 用来跑eureka集群节点的更新任务
    taskExecutor = Executors.newSingleThreadScheduledExecutor(
    new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
    Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
    thread.setDaemon(true);
    return thread;
    }
    }
    );
    try {
    // 这里是更新自己内存中的eureka集群,
    // 对新的eureka节点进行创建实例,然后添加到集群进去
    // 对已经无效的eureka 节点清除出这个集群中
    // 这里被Spring Cloud Netflix Eureka拿来做eureka集群的在线扩缩容

    // 更新eureka server集群信息
    updatePeerEurekaNodes(resolvePeerUrls());
    Runnable peersUpdateTask = new Runnable() {
    @Override
    public void run() {
    try {
    updatePeerEurekaNodes(resolvePeerUrls());
    } catch (Throwable e) {
    logger.error("Cannot update the replica Nodes", e);
    }

    }
    };
    taskExecutor.scheduleWithFixedDelay(
    peersUpdateTask,
    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
    TimeUnit.MILLISECONDS
    );
    } catch (Exception e) {
    throw new IllegalStateException(e);
    }
    for (PeerEurekaNode node : peerEurekaNodes) {
    logger.info("Replica node URL: {}", node.getServiceUrl());
    }
    }
    • 创建一个线程池
    • 解析配置文件中配置的eureka server地址列表
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    class: PeerEurekaNodes

    protected void updatePeerEurekaNodes(List<String> newPeerUrls) {

    // 这里toShutdown就是需要缩容的eureka server节点
    // 实现很简单,就是以最新的服务地址列表为准,除此之外的都需要干掉
    Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
    toShutdown.removeAll(newPeerUrls);

    // 这里toAdd就是需要扩容的eureka server节点
    // 实现很简单,就是拿新增的服务地址列表
    Set<String> toAdd = new HashSet<>(newPeerUrls);
    toAdd.removeAll(peerEurekaNodeUrls);

    // Remove peers no long available
    List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);

    // 这里处理缩容的逻辑
    // 遍历现在的集群里的节点,删除缩容的节点,并且关停缩容的节点
    // 关停缩容的节点 其实没啥特别的 就是释放一些线程池资源而已
    if (!toShutdown.isEmpty()) {
    logger.info("Removing no longer available peer nodes {}", toShutdown);
    int i = 0;
    while (i < newNodeList.size()) {
    PeerEurekaNode eurekaNode = newNodeList.get(i);
    if (toShutdown.contains(eurekaNode.getServiceUrl())) {
    newNodeList.remove(i);
    eurekaNode.shutDown();
    } else {
    i++;
    }
    }
    }

    // 这里处理扩容的逻辑
    // 遍历需要扩容的节点,创建之,加入集群
    // Add new peers
    if (!toAdd.isEmpty()) {
    logger.info("Adding new peer nodes {}", toAdd);
    for (String peerUrl : toAdd) {
    newNodeList.add(createPeerEurekaNode(peerUrl));
    }
    }

    this.peerEurekaNodes = newNodeList;
    this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
    }

    集群的扩容和缩容

    扩容

    • 根据最新的eureka server地址列表
    • 与老的服务列表做比对,算出新增的服务地址,这里就是需要扩容的节点
    • 创建节点,加入集群

    缩容

    • 根据最新的eureka server地址列表
    • 与老的服务列表做比对,算出待删除的服务地址,这里就是需要缩容的节点
    • 从集群中剔除节点,释放节点的线程池资源
  • 集群同步机制怎么实现的?

    每当服务注册、下线、取消、状态变更时,都会执行PeerAwareInstanceRegistryImpl的replicateToPeers的方法,这个方法会将这些动作同步到内存集群的队列中,然后有定时任务,会分派这些动作,将这些动作打成一批批的包,分批请求对应的eureka server。

    在内存集群中,复制任务的接收、分派、消费这套流程采用了三层队列来实现,下面我们分析一下这个机制

    • 每个内存集群中的节点都有两个任务分派器TaskDispatcher

    • 每个TaskDispatcher有两个执行器组成

      • 一个acceptorExecutor,负责接收任务
        • 如果重新处理队列、接收队列、待处理任务任意一个不为空,就执行如下逻辑;否则就睡10秒钟
          • 任务先进接收队列(acceptorQueue)
          • 有个叫AcceptorRunner的后台线程不断的消费接收队列
            • 不断拉取重新处理队列(reprocessQueue)
              • 如果不为空,将里面的任务放到待执行任务(pendingTasks)中去; 插入处理顺序队列(processingOrder)
              • 如果满了,累加重新处理队列已经清除的任务个数;清空重新处理队列
            • 不断拉取接收队列
              • 如果待执行任务满了,处理顺序队列里删除这个任务
              • 如果没满,就放入待执行任务中去
              • 如果刚才放入待执行任务中的那个任务是新任务,就将这个任务加入处理顺序队列中去
        • 分派批处理任务
          • 积攒到一批或者待执行任务中最老的任务已经延时超过500毫秒,就将待执行任务任务打成一批,放到批处理任务工作队列里
        • 分派单项处理任务
      • 一个taskExecutor,负责执行任务
        • 这里会从批处理任务队列里,一批批的消费,将一批任务一次请求发送到对应的eureka server节点
    • 总结

      eureka server同步任务批处理机制(1)

Comments