@Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT) @org.springframework.cloud.context.config.annotation.RefreshScope @Lazy public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance, @Autowired(required = false) HealthCheckHandler healthCheckHandler) { // If we use the proxy of the ApplicationInfoManager we could run into a // problem // when shutdown is called on the CloudEurekaClient where the // ApplicationInfoManager bean is // requested but wont be allowed because we are shutting down. To avoid this // we use the // object directly. ApplicationInfoManager appManager; if (AopUtils.isAopProxy(manager)) { appManager = ProxyUtils.getTargetObject(manager); } else { appManager = manager; } CloudEurekaClientcloudEurekaClient=newCloudEurekaClient(appManager, config, this.optionalArgs, this.context); cloudEurekaClient.registerHealthCheck(healthCheckHandler); return cloudEurekaClient; }
LongdirtyTimestamp= instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Futurenext= scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
就是只有dirtyTimestamp时间戳不等于null时才去注册。就是变成脏的了需要去注册。
服务器端
构建jersey server
1 2 3
@Bean public javax.ws.rs.core.Application jerseyApplication(Environment environment, ResourceLoader resourceLoader)
booleanisRemoteRegionRequested=null != regionsStr && !regionsStr.isEmpty(); String[] regions = null; if (!isRemoteRegionRequested) { EurekaMonitors.GET_ALL.increment(); } else { regions = regionsStr.toLowerCase().split(","); Arrays.sort(regions); // So we don't have different caches for same regions queried in different order. EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment(); }
// Check if the server allows the access to the registry. The server can // restrict access if it is not // ready to serve traffic depending on various reasons. if (!registry.shouldAllowAccess(isRemoteRegionRequested)) { return Response.status(Status.FORBIDDEN).build(); } CurrentRequestVersion.set(Version.toEnum(version)); KeyTypekeyType= Key.KeyType.JSON; StringreturnMediaType= MediaType.APPLICATION_JSON; if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) { keyType = Key.KeyType.XML; returnMediaType = MediaType.APPLICATION_XML; }
KeycacheKey=newKey(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions );
// If the delta flag is disabled in discovery or if the lease expiration // has been disabled, redirect clients to get all instances if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) { return Response.status(Status.FORBIDDEN).build(); }
String[] regions = null; if (!isRemoteRegionRequested) { EurekaMonitors.GET_ALL_DELTA.increment(); } else { regions = regionsStr.toLowerCase().split(","); Arrays.sort(regions); // So we don't have different caches for same regions queried in different order. EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment(); }
protectedbooleaninternalCancel(String appName, String id, boolean isReplication) { read.lock(); try { CANCEL.increment(isReplication); Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> leaseToCancel = null; if (gMap != null) { leaseToCancel = gMap.remove(id); } recentCanceledQueue.add(newPair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")")); InstanceStatusinstanceStatus= overriddenInstanceStatusMap.remove(id); if (instanceStatus != null) { logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name()); } if (leaseToCancel == null) { CANCEL_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id); returnfalse; } else { leaseToCancel.cancel(); InstanceInfoinstanceInfo= leaseToCancel.getHolder(); Stringvip=null; Stringsvip=null; if (instanceInfo != null) { instanceInfo.setActionType(ActionType.DELETED); recentlyChangedQueue.add(newRecentlyChangedItem(leaseToCancel)); instanceInfo.setLastUpdatedTimestamp(); vip = instanceInfo.getVIPAddress(); svip = instanceInfo.getSecureVipAddress(); } invalidateCache(appName, vip, svip); logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication); } } finally { read.unlock(); }
synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to cancel it, reduce the number of clients to send renews. this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1; updateRenewsPerMinThreshold(); } }
@Override protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) { List<EurekaEndpoint> candidateHosts = null; intendpointIdx=0; for (intretry=0; retry < numberOfRetries; retry++) { EurekaHttpClientcurrentHttpClient= delegate.get(); EurekaEndpointcurrentEndpoint=null; if (currentHttpClient == null) { if (candidateHosts == null) { candidateHosts = getHostCandidates(); if (candidateHosts.isEmpty()) { thrownewTransportException("There is no known eureka server; cluster server list is empty"); } } if (endpointIdx >= candidateHosts.size()) { thrownewTransportException("Cannot execute request on any known server"); }
try { EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient); if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) { delegate.set(currentHttpClient); if (retry > 0) { logger.info("Request execution succeeded on retry #{}", retry); } return response; } logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode()); } catch (Exception e) { logger.warn("Request execution failed with message: {}", e.getMessage()); // just log message as the underlying client should log the stacktrace }
// Connection error or 5xx from the server that must be retried on another server delegate.compareAndSet(currentHttpClient, null); if (currentEndpoint != null) { quarantineSet.add(currentEndpoint); } } thrownewTransportException("Retry limit reached; giving up on completing the request"); }
@Override publicvoidrun() { try { longcompensationTimeMs= getCompensationTimeMs(); logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs); evict(compensationTimeMs); } catch (Throwable e) { logger.error("Could not run the evict task", e); } }
/** * compute a compensation time defined as the actual time this task was executed since the prev iteration, * vs the configured amount of time for execution. This is useful for cases where changes in time (due to * clock skew or gc for example) causes the actual eviction task to execute later than the desired time * according to the configured cycle. */ longgetCompensationTimeMs() { longcurrNanos= getCurrentTimeNano(); longlastNanos= lastExecutionNanosRef.getAndSet(currNanos); if (lastNanos == 0l) { return0l; }
publicvoidevict(long additionalLeaseMs) { logger.debug("Running the evict task");
if (!isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled."); return; }
// We collect first all expired items, to evict them in random order. For large eviction sets, // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it, // the impact should be evenly distributed across all applications. List<Lease<InstanceInfo>> expiredLeases = newArrayList<>(); for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) { Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue(); if (leaseMap != null) { for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) { Lease<InstanceInfo> lease = leaseEntry.getValue(); if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) { expiredLeases.add(lease); } } } }
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for // triggering self-preservation. Without that we would wipe out full registry. intregistrySize= (int) getLocalRegistrySize(); intregistrySizeThreshold= (int) (registrySize * serverConfig.getRenewalPercentThreshold()); intevictionLimit= registrySize - registrySizeThreshold;
Randomrandom=newRandom(System.currentTimeMillis()); for (inti=0; i < toEvict; i++) { // Pick a random item (Knuth shuffle algorithm) intnext= i + random.nextInt(expiredLeases.size() - i); Collections.swap(expiredLeases, i, next); Lease<InstanceInfo> lease = expiredLeases.get(i);