spring_cloud_eureka源码分析

服务端入口

从使用上来说:

需要在pom.xml中添加eureka-server的依赖。

根据spring boot的自动配置机制。

在eureka-server的jar包中spring.factories中包含org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

查看类:

1
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)

意思是只有EurekaServerMarkerConfiguration.Marker这个实例才生效。

这就对应:

1
@EnableEurekaServer

注解里

1
@Import(EurekaServerMarkerConfiguration.class)
1
EurekaServerMarkerConfiguration中有实例Marker的bean

这个Marker是个开关类。就是实例化后才启用eureka-server相关的配置。

eureka使用架构图

eureka使用架构图eureka使用架构图

客户端入口源码

Eureka-client的jar包中org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration

1
RefreshableEurekaClientConfiguration类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class,
search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config, EurekaInstanceConfig instance,
@Autowired(required = false) HealthCheckHandler healthCheckHandler) {
// If we use the proxy of the ApplicationInfoManager we could run into a
// problem
// when shutdown is called on the CloudEurekaClient where the
// ApplicationInfoManager bean is
// requested but wont be allowed because we are shutting down. To avoid this
// we use the
// object directly.
ApplicationInfoManager appManager;
if (AopUtils.isAopProxy(manager)) {
appManager = ProxyUtils.getTargetObject(manager);
}
else {
appManager = manager;
}
CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,
config, this.optionalArgs, this.context);
cloudEurekaClient.registerHealthCheck(healthCheckHandler);
return cloudEurekaClient;
}
1
2
3
CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,
config, this.optionalArgs, this.context);
super(applicationInfoManager, config, args);
1
2
3
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer)
1
initScheduledTasks();

注册

客户端

1
InstanceInfoReplicator类

最终调用注册接口

1
discoveryClient.register();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}

这时发现类里大量的使用的线程池执行。

eureka-client请求流程图

注册定时任务是有条件的:

1
InstanceInfoReplicator类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void run() {
try {
discoveryClient.refreshInstanceInfo();

Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}

就是只有dirtyTimestamp时间戳不等于null时才去注册。就是变成脏的了需要去注册。

服务器端

构建jersey server

1
2
3
@Bean
public javax.ws.rs.core.Application jerseyApplication(Environment environment,
ResourceLoader resourceLoader)

相当于加载了这个类

1
public class ApplicationsResource

看上图可知,客户端注册调用的是applicationResource的addInstance方法

服务抽象为InstanceInfo

1
registry.register(info, "true".equals(isReplication));
1
InstanceRegistry类
1
2
3
4
5
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
super.register(info, isReplication);
}

super.register(info, isReplication);

1
AbstractInstanceRegistry类

容器:内存存储的服务实例

1
2
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();

AbstractInstanceRegistry类register方法最后一行

1
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());

发现 readWriteCacheMap, 这相当于一个缓存,调用了缓存删除

获取单个接口

服务器端源码

同注册,可以获取到applicationResource

1
2
3
4
5
6
7
8
9
10
11
@Path("{appId}")
public ApplicationResource getApplicationResource(
@PathParam("version") String version,
@PathParam("appId") String appId) {
CurrentRequestVersion.set(Version.toEnum(version));
try {
return new ApplicationResource(appId, serverConfig, registry);
} finally {
CurrentRequestVersion.remove();
}
}
1
applicationResource.getApplication()
1
ResponseCacheImpl类
1
2
3
public String get(final Key key) {
return get(key, shouldUseReadOnlyResponseCache);
}
1
2
3
4
5
6
7
8
String get(final Key key, boolean useReadOnlyCache) {
Value payload = getValue(key, useReadOnlyCache);
if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
return null;
} else {
return payload.getPayload();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
return payload;
}

发现是去从缓存中取。readOnlyCacheMap 和 readWriteCacheMap。

readWriteCacheMap

初始化:在new ResponseCacheImpl时进行初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener<Key, Value>() {
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
Value value = generatePayload(key);
return value;
}
});

初始化容量:

1
CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())

过期:

1
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)

删除监听器:

1
2
3
4
5
6
7
8
9
10
.removalListener(new RemovalListener<Key, Value>() {
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})

加载:

1
2
3
4
5
6
7
8
9
10
11
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
Value value = generatePayload(key);
return value;
}
});
1
Value value = generatePayload(key);
1
payload = getPayLoad(key, registry.getApplication(key.getName()));

发现如果readWriteCacheMap没有会去

1
2
ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();

里取。

readOnlyCacheMap

选中readOnlyCacheMap,查找用法。发现readOnlyCacheMap是在ResponseCacheImpl实例化的成员变量初始化的。

同样查找用法,发现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
logger.debug("Updating the client cache from response cache");
for (Key key : readOnlyCacheMap.keySet()) {
if (logger.isDebugEnabled()) {
logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType());
}
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
} finally {
CurrentRequestVersion.remove();
}
}
}
};
}

上面的定时任务也是new ResponseCacheImpl时初始化的定时任务。

1
2
3
4
5
6
if (shouldUseReadOnlyResponseCache) {
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}

间隔responseCacheUpdateIntervalMs时间从readWriteCacheMap同步一次。

总结

eureka服务端三级缓存

一级缓存是实时变化的,多个服务的多个实例向eureka-server发送。有并发等问题。

二级缓存是对一级缓存的冷热key缓存,而且提供了缓存失效的方法。当注册表或者增量列表发生变化时,就会缓存失效。

三级缓存是对读写缓存的缓存。它主要解决读写缓存频繁失效而导致拉取注册表和增量注册表时,请求直击eureka server 本地缓存注册表的情况。

因为eureka server本地注册表是一个频繁读写的ConcurrentHashMap,为了减轻它的压力,才引入了读写缓存。

而读写缓存为了保持和eureka server本地缓存的一次性,在修改本地缓存的同时会让读写缓存失效。

只读缓存的存在目的:

1.针对短期重启行为和短期网络波动行为进行一个客户端注册表保护。

2.在二级缓存被删除时,继续给eureka-client提供服务,不然直接使用二级缓存,会出现client获取无可用服务。

获取注册表接口

客户端源码

1
2
3
4
5
6
7
8
9
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
1
2
3
4
5
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
1
boolean success = fetchRegistry(remoteRegionsModified);
1
2
3
getAndStoreFullRegistry();
或者
getAndUpdateDelta(applications);
1
2
3
4
5
6
7
8
9
@Override
public EurekaHttpResponse<Applications> getApplications(String... regions) {
return getApplicationsInternal("apps/", regions);
}

@Override
public EurekaHttpResponse<Applications> getDelta(String... regions) {
return getApplicationsInternal("apps/delta", regions);
}

服务器端

1
ApplicationsResource类

获取全部:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@GET
public Response getContainers(@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo,
@Nullable @QueryParam("regions") String regionsStr) {

boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
String[] regions = null;
if (!isRemoteRegionRequested) {
EurekaMonitors.GET_ALL.increment();
} else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
}

// Check if the server allows the access to the registry. The server can
// restrict access if it is not
// ready to serve traffic depending on various reasons.
if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
return Response.status(Status.FORBIDDEN).build();
}
CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = Key.KeyType.JSON;
String returnMediaType = MediaType.APPLICATION_JSON;
if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
keyType = Key.KeyType.XML;
returnMediaType = MediaType.APPLICATION_XML;
}

Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);

Response response;
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
response = Response.ok(responseCache.get(cacheKey))
.build();
}
CurrentRequestVersion.remove();
return response;
}

增量获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Path("delta")
@GET
public Response getContainerDifferential(
@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {

boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();

// If the delta flag is disabled in discovery or if the lease expiration
// has been disabled, redirect clients to get all instances
if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) {
return Response.status(Status.FORBIDDEN).build();
}

String[] regions = null;
if (!isRemoteRegionRequested) {
EurekaMonitors.GET_ALL_DELTA.increment();
} else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment();
}

CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = Key.KeyType.JSON;
String returnMediaType = MediaType.APPLICATION_JSON;
if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
keyType = Key.KeyType.XML;
returnMediaType = MediaType.APPLICATION_XML;
}

Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS_DELTA,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);

final Response response;

if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
response = Response.ok(responseCache.get(cacheKey)).build();
}

CurrentRequestVersion.remove();
return response;
}

心跳

心跳的客户端代码与注册类似,这里就不再看了。他们也是在一起的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}

服务器端:

eureka-server心跳接收流程图

1
InstanceResource类renewLease方法
1
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
1
InstanceRegistry类renew方法
1
return super.renew(appName, serverId, isReplication);
1
PeerAwareInstanceRegistryImpl类renew方法
1
2
3
4
5
6
7
public boolean renew(final String appName, final String id, final boolean isReplication) {
if (super.renew(appName, id, isReplication)) {
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
overriddenInstanceStatus.name(),
instanceInfo.getId());
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

}
}
renewsLastMin.increment();
leaseToRenew.renew();
return true;
}
}
1
2
3
4
// 最后一分钟续约次数            
renewsLastMin.increment();
// 租约增加
leaseToRenew.renew();
1
2
// 同步至其他server的方法
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);

下线

客户端

1
DiscoveryClient类shutdown方法是在实例销毁前执行的方法
1
unregister();
1
EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());

服务器端

1
InstanceResource类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@DELETE
public Response cancelLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
try {
boolean isSuccess = registry.cancel(app.getName(), id,
"true".equals(isReplication));

if (isSuccess) {
logger.debug("Found (Cancel): {} - {}", app.getName(), id);
return Response.ok().build();
} else {
logger.info("Not Found (Cancel): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
} catch (Throwable e) {
logger.error("Error (cancel): {} - {}", app.getName(), id, e);
return Response.serverError().build();
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
protected boolean internalCancel(String appName, String id, boolean isReplication) {
read.lock();
try {
CANCEL.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
leaseToCancel = gMap.remove(id);
}
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
if (instanceStatus != null) {
logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
}
if (leaseToCancel == null) {
CANCEL_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
return false;
} else {
leaseToCancel.cancel();
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
instanceInfo.setActionType(ActionType.DELETED);
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
}
} finally {
read.unlock();
}

synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to cancel it, reduce the number of clients to send renews.
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
updateRenewsPerMinThreshold();
}
}

return true;
}

客户端如何选择server

1
RetryableEurekaHttpClient类
1
2
//重试次数 也是 eureka-server的个数
public static final int DEFAULT_NUMBER_OF_RETRIES = 3;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
List<EurekaEndpoint> candidateHosts = null;
int endpointIdx = 0;
for (int retry = 0; retry < numberOfRetries; retry++) {
EurekaHttpClient currentHttpClient = delegate.get();
EurekaEndpoint currentEndpoint = null;
if (currentHttpClient == null) {
if (candidateHosts == null) {
candidateHosts = getHostCandidates();
if (candidateHosts.isEmpty()) {
throw new TransportException("There is no known eureka server; cluster server list is empty");
}
}
if (endpointIdx >= candidateHosts.size()) {
throw new TransportException("Cannot execute request on any known server");
}

currentEndpoint = candidateHosts.get(endpointIdx++);
currentHttpClient = clientFactory.newClient(currentEndpoint);
}

try {
EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
delegate.set(currentHttpClient);
if (retry > 0) {
logger.info("Request execution succeeded on retry #{}", retry);
}
return response;
}
logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
} catch (Exception e) {
logger.warn("Request execution failed with message: {}", e.getMessage()); // just log message as the underlying client should log the stacktrace
}

// Connection error or 5xx from the server that must be retried on another server
delegate.compareAndSet(currentHttpClient, null);
if (currentEndpoint != null) {
quarantineSet.add(currentEndpoint);
}
}
throw new TransportException("Retry limit reached; giving up on completing the request");
}

发现,这个方法是根据重试次数来获取server的地址的。从0开始到3,所以说eureka-server默认是3,如果第一个已经成功了直接return了。

自我保护

什么是自我保护机制

当Eureka Server节点在短时间内丢失过多客户端时(可能发生了网络分区故障,服务实例与Eureka Server之间无法正常通信),那么这个server节点就会进入自我保护模式(eureka.server.enable-self-preservation=true,默认开启自我保护模式)。一旦进入该模式,Eureka Server就会保护服务注册表中的信息,不再删除服务注册表中的数据(也就是不会注销任何微服务)。当网络故障恢复后,该Eureka Server节点会自动退出自我保护模式

开启条件

Renews threshold:Eureka Server期望每分钟收到客户端实例续约的阈值

**Renews (last min)**:Eureka Server最后1分钟收到客户端实例续约的总数

自我保护模式开启的条件是:1分钟后,若Renews (last min) < Renews threshold,那么开启自我保护机制

1
AbstractInstanceRegistry类
1
2
3
4
5
protected void updateRenewsPerMinThreshold() {
this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
* serverConfig.getRenewalPercentThreshold());
}

numberOfRenewsPerMinThreshold就是Dashboard中的Renews threshold
expectedNumberOfClientsSendingRenews期望收到客户端续约的总数(实际为服务实例的总数)
getExpectedClientRenewalIntervalSeconds()获取客户端续约间隔(秒为单位)的方法,(默认30s)
getRenewalPercentThreshold()获取自我保护续约百分比阈值因子(默认85%)

那么:

Renews threshold = 服务实例总数 * (60 / 续约间隔) * 自我保护续约百分比阈值因子

Renews (last min) = 服务实例总数 * (60 / 续约间隔)

1
PeerAwareInstanceRegistryImpl类
1
2
3
4
5
6
7
8
@Override
public boolean isLeaseExpirationEnabled() {
if (!isSelfPreservationModeEnabled()) {
// The self preservation mode is disabled, hence allowing the instances to expire.
return true;
}
return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}

isLeaseExpirationEnabled()是Eureka Server失效剔除时调用,判断是否需要清理。如果自我保护模式没开启,那就可以清理。如果自我保护模式开启了,且当续约阈值 > 0,上一分钟的续约数 > 阈值,那么可以清理;当上一分钟续约数 < 阈值,那么就不清理

Renews threshold重置

应用注册时重置、应用下线时重置:

1
PeerAwareInstanceRegistryImpl.updateRenewsPerMinThreshold();

定时任务重置

1
PeerAwareInstanceRegistryImpl.scheduleRenewalThresholdUpdateTask();

应用实例失效剔除

spring 的生命周期管理

1
EurekaServerInitializerConfiguration类实现了SmartLifecycle

spring 会在启动后调用start()方法。

1
2
eurekaServerBootstrap.contextInitialized(
EurekaServerInitializerConfiguration.this.servletContext);
1
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
1
2
super.openForTraffic(applicationInfoManager,
count == 0 ? this.defaultOpenForTrafficCount : count);
1
super.postInit();
1
evictionTaskRef.set(new EvictionTask());

这里发现EvictionTask类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class EvictionTask extends TimerTask {

private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);

@Override
public void run() {
try {
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}

/**
* compute a compensation time defined as the actual time this task was executed since the prev iteration,
* vs the configured amount of time for execution. This is useful for cases where changes in time (due to
* clock skew or gc for example) causes the actual eviction task to execute later than the desired time
* according to the configured cycle.
*/
long getCompensationTimeMs() {
long currNanos = getCurrentTimeNano();
long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
if (lastNanos == 0l) {
return 0l;
}

long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
return compensationTime <= 0l ? 0l : compensationTime;
}

long getCurrentTimeNano() { // for testing
return System.nanoTime();
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");

if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}

// We collect first all expired items, to evict them in random order. For large eviction sets,
// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
// the impact should be evenly distributed across all applications.
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}

// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
// triggering self-preservation. Without that we would wipe out full registry.
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;

int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);

String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
internalCancel(appName, id, false);
}
}
}

eureka服务器间的复制

spring 的生命周期管理

1
EurekaServerInitializerConfiguration类实现了SmartLifecycle

spring 会在启动后调用start()方法。

1
2
eurekaServerBootstrap.contextInitialized(
EurekaServerInitializerConfiguration.this.servletContext);
1
initEurekaServerContext();
1
int registryCount = this.registry.syncUp();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;

for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}

Eureka Server接收到Eureka Client的注册、续约、下线等操作,固定间隔(默认,500毫秒)向Eureka Server集群内其他节点同步

1
2
3
4
5
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
replicateToPeers(Action.StatusUpdate, appName, id, null, newStatus, isReplication);
replicateToPeers(Action.DeleteStatusOverride, appName, id, null, null, isReplication);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
} finally {
CurrentRequestVersion.remove();
}
}

注册、下线、续约等请求处理程序都判断了isReplication的值,该值是来源于Request Header的x-netflix-discovery-replication,Eureka Client的注册请求isReplication为false,接收注册请求的Eureka Server节点会将该注册信息同步到其他Eureka Server节点,同步请求的isReplication为true,表示该注册信息是由其他Eureka Server节点复制过来的,这时候就不会继续往下传递了,避免了复制死循环的问题。

CAP

C consistency 一致性

A Availability 可用性

P Partition Tolerance 分区容错性

P 在集群系统中是必须的,不然就是单点服务。

Zookeeper Eureka
设计原则 CP AP
优点 数据强一致 服务高可用
缺点 网络分区会影响Leader选举,超过阈值后集群不可用 服务节点间的数据可能不一致;Client-Server间的数据可能不一致;
适用场景 单机房集群,对数据一致性要求较高 云机房集群,跨越多机房部署;对注册中心服务可用性要求较高

优化或参数设置

服务端

根据自我保护的源码分析可以根据项目情况设置

1
2
3
4
5
6
7
server:
# 自我保护,看服务多少。
enable-self-preservation: false
# 自我保护阈值
renewal-percent-threshold: 0.85
# 剔除服务时间间隔
eviction-interval-timer-in-ms: 1000

根据获取注册表的源码:

1
2
3
4
5
server:
# 关闭从readOnly读注册表
use-read-only-response-cache: false
# readWrite 和 readOnly 同步时间间隔。
response-cache-update-interval-ms: 1000

客户端

根据客户端如何选择server的源码:

客户端可以如下配置

客户端A

1
2
3
4
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka/,http://localhost:8762/eureka/

客户端B

1
2
3
4
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8762/eureka/,http://localhost:8761/eureka/

但是没啥必要,看下文,一个server可以支撑多少。

拉取注册表和心跳等客户端配置

1
2
3
4
5
6
7
8
9
10
11
eureka:
client:
# 刷新注册表的间隔时间
registry-fetch-interval-seconds: 5
serviceUrl:
defaultZone: http://localhost:8761/eureka/,http://localhost:8761/eureka/
instance:
# 心跳续约间隔
lease-renewal-interval-in-seconds: 10
# 告诉server在接收到实例的最后一次发出的心跳后,需要等待多久才可以将此实例删除,默认90秒
lease-expiration-duration-in-seconds: 10

知道这些参数后:

1.可以减少服务上下线的延时

2.自我保护的选择:看网络和服务情况是否开启

3.服务如何上线:先停client服务,再发送下线请求。

eureka-server可支撑client测算

现在咱们假设手头有一套大型的分布式系统,一共100个服务,每个服务部署在20台机器上,机器是4核8G的标准配置。

也就是说,相当于你一共部署了100 * 20 = 2000个服务实例,有2000台机器。

每台机器上的服务实例内部都有一个Eureka Client组件,它会每隔30秒请求一次Eureka Server,拉取变化的注册表。

此外,每个服务实例上的Eureka Client都会每隔30秒发送一次心跳请求给Eureka Server。

那么大家算算,Eureka Server作为一个微服务注册中心,每秒钟要被请求多少次?一天要被请求多少次?

​ 按标准的算法,每个服务实例每分钟请求2次拉取注册表,每分钟请求2次发送心跳

​ 这样一个服务实例每分钟会请求4次,2000个服务实例每分钟请求8000次

​ 换算到每秒,则是8000 / 60 = 133次左右,我们就大概估算为Eureka Server每秒会被请求150次

​ 那一天的话,就是8000 * 60 * 24 = 1152万,也就是每天千万级访问量

可以对上面的注册、续约接口进行压测,发现10000万请求,server只用了0.1秒就可以返回。说明server的承受能力。


spring_cloud_eureka源码分析
http://hanqichuan.com/2022/06/07/spring_cloud/spring_cloud_eureka源码分析/
作者
韩启川
发布于
2022年6月7日
许可协议