Eureka Server模块分析

1、Eureka Server原来是一个简单的Servlet应用

Eureka Server模块非常简单,有价值的就一个web.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
<!-- 这个EurekaBootStrap 就是Eureka Server的入口代码,我们从这个类开始分析就对了 -->
<listener>
<listener-class>com.netflix.eureka.EurekaBootStrap</listener-class>
</listener>

<filter>
<filter-name>jersey</filter-name>
<filter-class>com.sun.jersey.spi.container.servlet.ServletContainer</filter-class>
<init-param>
<param-name>com.sun.jersey.config.property.WebPageContentRegex</param-name>
<param-value>/(flex|images|js|css|jsp)/.*</param-value>
</init-param>
<init-param>
<param-name>com.sun.jersey.config.property.packages</param-name>
<param-value>com.sun.jersey;com.netflix</param-value>
</init-param>

<!-- GZIP content encoding/decoding -->
<init-param>
<param-name>com.sun.jersey.spi.container.ContainerRequestFilters</param-name>
<param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter</param-value>
</init-param>
<init-param>
<param-name>com.sun.jersey.spi.container.ContainerResponseFilters</param-name>
<param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter</param-value>
</init-param>
</filter>

<filter-mapping>
<filter-name>statusFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>

<filter-mapping>
<filter-name>requestAuthFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>

<!-- Uncomment this to enable rate limiter filter.
<filter-mapping>
<filter-name>rateLimitingFilter</filter-name>
<url-pattern>/v2/apps</url-pattern>
<url-pattern>/v2/apps/*</url-pattern>
</filter-mapping>
-->

<filter-mapping>
<filter-name>gzipEncodingEnforcingFilter</filter-name>
<url-pattern>/v2/apps</url-pattern>
<url-pattern>/v2/apps/*</url-pattern>
</filter-mapping>

<filter-mapping>
<filter-name>jersey</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>

<welcome-file-list>
<welcome-file>jsp/status.jsp</welcome-file>
</welcome-file-list>

看到这个,一下子回想起多年以前,我们打war包部署到Tomcat中发布应用的时候。可以看到web.xml定义了各种Listener、filter、filter-mapping,比如Listener就是在Servlet容器启动时跟随着初始化,做一些事情。原来,Eureka Server是用最传统的Servlet那种方式开发的一个简单web应用。
在test目录下,发现有一个EurekaClientServerRestIntegrationTest单元测试类,里面的setUp()方法验证了我们的猜想:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@BeforeClass
public static void setUp() throws Exception {
// 注入一些Eureka配置数据(这是为了测试而mock的配置)
injectEurekaConfiguration();
// 启动服务
startServer();
createEurekaServerConfig();

httpClientFactory = JerseyEurekaHttpClientFactory.newBuilder()
.withClientName("testEurekaClient")
.withConnectionTimeout(1000)
.withReadTimeout(1000)
.withMaxConnectionsPerHost(1)
.withMaxTotalConnections(1)
.withConnectionIdleTimeout(1000)
.build();

jerseyEurekaClient = httpClientFactory.newClient(new DefaultEndpoint(eurekaServiceUrl));

ServerCodecs serverCodecs = new DefaultServerCodecs(eurekaServerConfig);
jerseyReplicationClient = JerseyReplicationClient.createReplicationClient(
eurekaServerConfig,
serverCodecs,
eurekaServiceUrl
);
}

这个startServer()方法显式的证明了我们的猜想。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static void startServer() throws Exception {
// 找到本地打好的Eureka server的war包
File warFile = findWar();
// 启动一个jetty server用于测试
server = new Server(8080);
// 设置jetty的一些配置,比如webapp上下文之类的东西
WebAppContext webapp = new WebAppContext();
webapp.setContextPath("/");
webapp.setWar(warFile.getAbsolutePath());
server.setHandler(webapp);

// 启动jetty
server.start();

eurekaServiceUrl = "http://localhost:8080/v2";
}

2、找到了入口代码com.netflix.eureka.EurekaBootStrap

EurekaServer启动核心步骤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class EurekaBootStrap implements ServletContextListener {
@Override
public void contextInitialized(ServletContextEvent event) {
try {
initEurekaEnvironment();
initEurekaServerContext();

ServletContext sc = event.getServletContext();
sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
} catch (Throwable e) {
logger.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}

@Override
public void contextDestroyed(ServletContextEvent event) {
try {
logger.info("{} Shutting down Eureka Server..", new Date());
ServletContext sc = event.getServletContext();
sc.removeAttribute(EurekaServerContext.class.getName());

destroyEurekaServerContext();
destroyEurekaEnvironment();

} catch (Throwable e) {
logger.error("Error shutting down eureka", e);
}
logger.info("{} Eureka Service is now shutdown...", new Date());
}
}

原来EurekaBootStrap实现了ServletContextListener接口,那么Servlet容器会在初始化ServletContext时,先调用contextInitialized方法。我们接下来重点分析这个方法。

环境初始化 & 基于单例模式的配置管理器

EurekaServer启动之初始化配置管理器

首先,Eureka Server会初始化类似于环境变量这样的一些列配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
protected void initEurekaEnvironment() throws Exception {
String dataCenter = ConfigurationManager.getConfigInstance().getString(EUREKA_DATACENTER);
if (dataCenter == null) {
ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
} else {
ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
}

String environment = ConfigurationManager.getConfigInstance().getString(EUREKA_ENVIRONMENT);
if (environment == null) {
ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
}
}
  • ConfigurationManager.getConfigInstance() 这个就是初始化配置管理器
  • 读取eureka.datacentereureka.environment配置,如果用户没有自定义就给一个默认值。数据中心的默认名称为default,eureka默认是test环境

ConfigurationManager隶属于Netflix Archaius项目,这是一个配置增强库,不仅提供了读取本地配置文件、环境变量等基础功能,还提供了定时拉取远程配置数据源到本地等高级功能。在这里我们可以简单理解为就是一个读取本地配置的工具类就可以了。

有关Archaius的更多信息请参考:https://github.com/Netflix/archaius

Eureka Server内大量运用了Archaius的ConfigurationManager,这个组件被设计成了单例的,这也符合我们的直觉。他的单例模式采用了double check + volatile来实现,这种方式在开源项目里算是常客了。

1
2
3
4
5
6
7
8
9
10
11
12
static volatile AbstractConfiguration instance = null;

public static AbstractConfiguration getConfigInstance() {
if (instance == null) {
synchronized (ConfigurationManager.class) {
if (instance == null) {
instance = getConfigInstance(Boolean.getBoolean(DynamicPropertyFactory.DISABLE_DEFAULT_CONFIG));
}
}
}
return instance;
}

有关double check + volatile的单例模式实现,他的来龙去脉可以在这里看到https://www.cs.umd.edu/~pugh/java/memoryModel/DoubleCheckedLocking.html

配置文件加载以及面向接口的配置项读取

设计亮点:面向接口的配置文件读取,DefaultEurekaServerConfig实现了EurekaServerConfig接口,EurekaServerConfig定义了大量配置获取的方法,这种实现方式让使用方非常清晰明了,通过方法名就获取到自己想要的配置,而且返回值还是类型友好的。

Eureka注册中心默认配置继承关系

Eureka注册中心默认配置组件的实例化过程,本质是读取classpath下的eureka-server.properties文件内容

读取eureka-server属性文件

实质上,Eureka注册中心默认配置组件是对配置管理器的一层很薄的封装,他每个获取配置的方法都是委托给配置管理器处理的。

eureka注册中心默认配置的核心结构

1
2
3
4
protected void initEurekaServerContext() throws Exception {
EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();
....
}
1
2
3
4
5
6
7
8
9
10
class DefaultEurekaServerConfig {
private static final DynamicStringProperty EUREKA_PROPS_FILE = DynamicPropertyFactory
.getInstance().getStringProperty("eureka.server.props",
"eureka-server");

private void init() {
String eurekaPropsFile = EUREKA_PROPS_FILE.get();
ConfigurationManager.loadCascadedPropertiesFromResources(eurekaPropsFile);
}
}
  • 使用配置管理器加载eureka-server.proerties配置文件
  • loadCascadedPropertiesFromResources这步拼接了eureka.server.props配置得到的文件名 + .properties后缀,得到一个完整的eureka server配置文件名,然后读取到配置管理器中。

基于构造器模式的服务实例构造

1
2
3
4
5
6
ApplicationInfoManager applicationInfoManager = null;

if (eurekaClient == null) {
EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
? new CloudInstanceConfig()
: new MyDataCenterInstanceConfig(); // 默认就是走这个
1
2
3
4
5
public PropertiesInstanceConfig(String namespace, DataCenterInfo info) {
super(info);
// 这步其实就是去加载 eureka-client.properties配置文件,这个配置文件名可用eureka.client.props配置指定
this.configInstance = Archaius1Utils.initConfig(CommonConstants.CONFIG_FILE_NAME);
}

此处使用构造器模式构建InstanceInfo,可以看到数据都来自EurekaInstanceConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class EurekaConfigBasedInstanceInfoProvider
public synchronized InstanceInfo get() {
// 主要就是从eureka实例配置中读取数据,构建服务实例信息对象(InstanceInfo),一个服务实例里面包含一个租约信息对象(LeaseInfo,也就是心跳信息)
if (instanceInfo == null) {
// 续约 心跳信息
LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()
.setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())
.setDurationInSecs(config.getLeaseExpirationDurationInSeconds());

// 服务实例信息
InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder(vipAddressResolver);

builder.setNamespace(config.getNamespace())
.setInstanceId(instanceId)
.setAppName(config.getAppname())
.setAppGroupName(config.getAppGroupName())
.setDataCenterInfo(config.getDataCenterInfo())
.setIPAddr(config.getIpAddress())
.setHostName(defaultAddress)
.setPort(config.getNonSecurePort())
.enablePort(PortType.UNSECURE, config.isNonSecurePortEnabled())
.setSecurePort(config.getSecurePort())
.enablePort(PortType.SECURE, config.getSecurePortEnabled())
.setVIPAddress(config.getVirtualHostName())
.setSecureVIPAddress(config.getSecureVirtualHostName())
.setHomePageUrl(config.getHomePageUrlPath(), config.getHomePageUrl())
.setStatusPageUrl(config.getStatusPageUrlPath(), config.getStatusPageUrl())
.setASGName(config.getASGName())
.setHealthCheckUrls(config.getHealthCheckUrlPath(),
config.getHealthCheckUrl(), config.getSecureHealthCheckUrl());

instanceInfo = builder.build();
instanceInfo.setLeaseInfo(leaseInfoBuilder.build());
}
return instanceInfo;
}

将自己作为Eureka Client完成复杂构造

1
2
3
4
5
6
protected void initEurekaServerContext() throws Exception {
// // 1、读取eureka client配置:这步其实就是去加载 eureka-client.properties配置文件,这个配置文件名可用eureka.client.props配置指定
EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
// eureka server把自己也当做是一个eureka-client,也是一个服务实例,向其他的eureka-server注册
eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
}

Eureka客户端的复杂实例化过程

  1. 保存EurekaClientConfig和TransportConfig
  2. 处理是否要拉取注册表和是否要将自己作为Eureka Client注册到eureka
  3. 支持调度的线程池
  4. 支持心跳的线程池
  5. 支持缓存刷新的线程池
  6. 实例化一个EurekaTransport,支持eureka client和eureka server底层网络通信的组件
  7. 如果需要抓取注册表,这里就会尝试抓取增量注册表,如果抓取失败则从备份里获取注册表
  8. 初始化调度任务
    1. 抓取服务注册表 默认间隔是 30秒 搞了一个CachedRefreshThread
    2. 向eureka server注册 心跳定时任务 默认间隔30秒; 服务实例复制器 默认40秒后开始执行 每次间隔30秒
    3. 服务状态变更监听器 如果开启,每次服务状态发生变更,就会通知远程的eureka server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
// 1. 保存EurekaClientConfig和TransportConfig
clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();

// 2. 处理是否要拉取注册表和是否要将自己作为Eureka Client注册到eureka
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}

if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
// 3. 支持调度的线程池
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
// 4. 支持心跳的线程池
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff

// 5. 支持缓存刷新的线程池
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff

// 6. 实例化一个EurekaTransport,支持eureka client和eureka server底层网络通信的组件
eurekaTransport = new EurekaTransport();
// 初始化eurekaTransport内的组件
scheduleServerEndpointTask(eurekaTransport, args);

// 7. 如果需要抓取注册表,这里就会尝试抓取增量注册表,如果抓取失败则从备份里获取注册表
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}

// 初始化调度任务
// 1. 抓取服务注册表 默认间隔是 30秒 搞了一个CachedRefreshThread
// 2. 向eureka server注册 心跳定时任务 默认间隔30秒; 服务实例复制器 默认40秒后开始执行 每次间隔30秒
// 3. 服务状态变更监听器 如果开启,每次服务状态发生变更,就会通知远程的eureka server
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
initScheduledTasks();
}

服务上下文构造以及初始化

  • 实例化一个 可以感知eureka server集群的服务实例注册表

  • 构造了一个Eureka集群的节点(eureka集群中每个节点是一模一样的,完全对等的,因其采用peer to peer的复制方式)

  • 构造了一个EurekaServerContext(这个服务器上下文就是将上述所有构造好的东西都打包到这个对象里,包含了服务器需要的所有东西。然后这个Context被丢入了一个Holder中,以后需要使用的时候可以直接从Holder取)

  • 初始化注册中心服务器上下文(EurekaServerConetxt.initialize()

    • 启动eureka集群(peerEurekaNodes.start
      这里说一下,这里的集群是指每个eureka server中的一组节点对象,eureka的设计理念是通过维护这么一组对象,每当自己这个节点发生变化时,就修改自己内存中的这组对象,然后这组对象会负责将变更同步到对应的Eureka Server节点中去

    • 初始化服务注册表(registry.init)
      猜测:将集群中每个节点的注册表都拿过来,合并后发到自己本地的注册表里去。

      这里得说一下eureka同步的模型,采用的是push模式,即每个节点的收到的写请求(注册、下线)或自己发出的写请求(心跳)都会主动push到别的节点。
      所以此处的将每个节点的注册表都拿过来,这里的数据是来自于其他节点主动推过来的数据

    • 从相邻的节点拷贝注册表信息,如果拷贝失败就找下一个(registr.syncUp

总结

Eureka Server的启动流成发生在EurekaBootStrap中,这个监听器配置到了web.xml中,会被servlet容器初始化。

启动流程总结:

  • 初始化环境
  • 读取eureka-properties配置文件
  • 将自己作为服务实例InstanceInfo进行实例化,数据从eureka-client.properties中读取,同时构建了服务实例信息管理器ApplicationInfoManager
  • 将自己作为eureka-client,从eureka-client中读取eureka client配置,基于服务实例和eureka-client配置构造了DiscoveryEurekaClient
  • 构造感知eureka-server集群的注册表PeerAwareInstanceRegistry
  • 构造一个eureka集群的信息PeerEurekaNodes
  • 基于eureka server配置,注册表,eureka server集群、服务实例,来构造了一个eureka server上下文EurekaServerContext
  • EurekaServerContext初始化
    • 更新eureka server集群信息(节点新增、删除)
    • 基于eureka server集群信息初始化注册表
  • 从相邻的eureka server节点拷贝注册表
  • 注册监控

eureka server启动的流程图

Comments