LoginSignup
0
0

More than 1 year has passed since last update.

Eureka 注册中心源码分析

Last updated at Posted at 2022-12-07

eureka version:2.x

这篇文章不讨论Eureka如何使用,只分析Eureka作为注册中心是如何实现高可用的服务注册、服务续约、故障感知和自我保护等功能的。

  • 核心功能模块
  • 核心数据及其处理组件
  • 服务实例核心功能及算法[重点]
  • 总结

核心功能模块

Eureka_功能模块.png

Eureka注册中心是通过HTTP协议进行服务实例数据传输的。eureka-server是整个注册中心CS模型的服务端,它通过各种 properties 文件及 web.xml 创建了一个处理HTTP网络请求的WEB应用。

eureka-client-jersey2:封装了RESTful风格的网络框架Jersey,用来处理 eureka-client 的各种请求。

eureka-client:则是各种服务相关操作的发起方,包括服务注册、服务续约、服务下线和服务发现等。操作的实现则是交给 jersey 框架处理。

eureka-core:Eureka注册中心的核心模块。模块中的各种 Resource 用来接收处理eureka-client发来的HTTP请求,类似与SpringMVC的Controller。Registry[注册表] 则是处理服务实例的核心功能组件。Registry 会封装到 PeerEurekaNode 中用于集群的服务实例数据同步。

核心数据及其处理组件

EurekaServerConfig & EurekaClientConfig

EurekaServerConfig存取eureka-server的相关配置[eureka-server.properties],包括是否开启自我保护机制、续约相关配置、集群节点读取配置等。EurekaClientConfig则是存取eureka-client的相关配置[eureka-client.properties],包括是否拉取注册表、拉取服务时间间隔、连接eureka-server超时时间等相关配置。【这里只列举几个参数,实际上有大量参数配置由它们处理,大多数情况下都是使用他们的默认值】。这两个配置类的优点是通过接口的方式获取配置信息,屏蔽了内部获取配置信息的细节,见名知意,对编码过程也是相当友好。

registry

// key值为appName,每个应用对应多个服务实例,每个服务是来用实例ID区分。服务实例包含了IP地址端口号等关键信息。
ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry;

注册表[registry]是注册中心的核心数据,使用的数据结构是ConcurrentHashMap这个并发容器,存储的是注册信息包装在InstanceInfo对象中。InstanceInfo封装在Lease这个对象中是为了处理服务注册、下线和续约操作的。

PeerAwareInstanceRegistry

注册组件[PeerAwareInstanceRegistry]作为注册中心的核心组件,与服务相关的核心功能都与之相关,下面的类图很容易看出其包含的核心能力。
注册表

LeaseManager:定义服务注册(register)、服务下线(cancel)、服务续约(renew)以及服务淘汰(evict)功能。

LookupService:定义了获取服务实例功能接口(getApplication[s])。

InstanceRegistry:定义了服务实例的各种操作,如开启服务、关闭服务、服务实例状态更新、清除服务注册信息等等。

PeerAwareInstanceRegistry:定义同步集群节点的注册信息(syncUp),状态更新(statusUpdate)等功能。

服务实例核心功能及算法[关键部分]

服务注册流程

// @DiscoveryClient.java
// 1、调用注册方法。
register()

// 2、内部调用前面提到的HTTP网络请求功能模块`eureka-client-jersey`的注册方法,向 eureka server 发送网络请求
eurekaTransport.registrationClient.register(instanceInfo)

// -->> 网络传输

// @ApplicationResource.java
// 3、通过WEB应用servlet的转发,请求由`eureka-core`模块的Resource组件的addInstance()方法处理
public Response addInstance(InstanceInfo info, ...) { // 省略
    // ...
    // 4、注册组件registry处理注册实例数据
    registry.register(info, "true".equals(isReplication));
    return ...OK; 
}

在服务注册过程中 eureka-server 处理核心数据的PeerAwareInstanceRegistry,主要做了两个事情:注册数据到本机注册表+复制信息到其他集群节点。

// 注册数据到本机注册表
super.register(info, leaseDuration, isReplication);

// 复制到其他节点
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);

注册数据到本机注册表时主要做了什么操作?

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    // 读锁来并发处理注册
    read.lock();
    
    try {
        // 添加注册信息的引用到注册表registry,注意这里是“引用”
        final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
        gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
        
        // 新注册的服务需要更新需要续约的服务数量
        synchronized (lock) {
            if (this.expectedNumberOfClientsSendingRenews > 0) {                        
            this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
            // 更新续约数量的阈值[后面会具体看这个算法]
            updateRenewsPerMinThreshold();
        }
        
        // 这里才是真正将注册信息放入注册表
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        gMap.put(registrant.getId(), lease);
        
        // ...
        } finally {
            read.unlock();
        }
    }

抓取注册信息&多级缓存机制

获取注册信息分为全量获取[eureka client启动或者配置设置为只进行全量获取]和增量获取[Delta]。除了启动时发起获取请求,在一定的周期内eureka client还需要不断的获取更新服务实例信息,所有eureka client在初始化的时候就创建了定时调度任务专门做这个事情。实际上,有三个定时调度任务被创建。

  • 定时获取更新服务实例信息的cacheRefreshTask
  • 心跳[续约服务实例]的heartbeatTask
  • 用于更新本地实例信息并将其复制到远程服务器的任务InstanceInfoReplicator
// 1
cacheRefreshTask = new TimedSupervisorTask(...,
    new CacheRefreshThread());
// 默认30秒调度一次
scheduler.schedule(cacheRefreshTask,registryFetchIntervalSeconds, TimeUnit.SECONDS); 

// 2
heartbeatTask = new TimedSupervisorTask(...,
        new HeartbeatThread());
// 默认30秒调度一次
scheduler.schedule(heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS);

// 3 这和前面定时调度方式稍微不一样,这个调度是操作完后再发起一个延时任务,如此循环往复。
instanceInfoReplicator.start(...);
// this==instanceInfoReplicator
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
public void run() {
    // ...
    // finally 
    Future next = scheduler.schedule(this,replicationIntervalSeconds, TimeUnit.SECONDS);
}

获取注册表的过程和注册服务类似,都是有eureka client发起,通过transport的HTTP框架发起获取注册表网路请求,然后server端由ApplicationsResource接收处理,最终由[registry]的缓存组件来返回服务注册信息。流程图如下:
eureka_002_抓取注册信息-多级缓存的注册表.png

什么情况下获取全量注册表信息,什么时候获取增量?

// 1、配置文件设置 disableDelta=ture
if (clientConfig.shouldDisableDelta()
    // 2、or 配置文件未设置
    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
    // 3、or 需要强行拉取全量注册信息[如初始化client的时候]
    || forceFullRegistryFetch
    // 4、本地没有任何服务实例信息的时候
    || (applications == null)
    // 5、从eureka server获取的应用信息为空时
    || (applications.getRegisteredApplications().size() == 0)
    // 6、版本不兼容的时候
    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
    {
        // 全量获取
        getAndStoreFullRegistry();
    } else {
        // 增量获取
        getAndUpdateDelta(applications);
}

eureka server 的缓存组件是如何区别增量获取和全量获取?

// 缓存组件为 responseCache
private final ResponseCache responseCache;

// 通过Key获取缓存数据,增量获取和全量获取的key值不一样。
// 全量获取key
Key cacheKey = new Key(..., ResponseCacheImpl.ALL_APPS,...);
     
// 增量获取key
Key cacheKey = new Key(..., ResponseCacheImpl.ALL_APPS_DELTA,...);
responseCache.get(cacheKey)

// ResponseCache.java
// >>> getValue()
if (useReadOnlyCache) { // eureka-server配置文件设置是否读只读缓存
    final Value currentPayload = readOnlyCacheMap.get(key);
    if (currentPayload != null) {
        // 只读缓存有值则返回
        payload = currentPayload;
    } else {
        // 否则,获取读写缓存的值
        payload = readWriteCacheMap.get(key);
        // 并放到只读缓存
        readOnlyCacheMap.put(key, payload);
    }   
} else {
    payload = readWriteCacheMap.get(key);
}
return payload;

多级缓存的更新时机?
eureka_003_多级缓存的更新时机.png

eureka server 在初始化注册组件PeerAwareInstanceRegistry的时候创建了二级缓存组件ResponseCache,缓存容器分别是类型为ConcurrentMap的只读缓存和由guava的LoadingCache实现的带过期的读写缓存。其工作原理是:

  1. 读写缓存readWriteCacheMap设置了默认180秒过期机制,那么在每次获取读写缓存的时候如果有值直接返回,如果没有则重新去注册表获取最新的注册信息。当然会通过key值来判断获取全量数据还是增量数据。然后180秒过期后会重新去获取。
  2. 定时任务会每隔30秒将读写缓存的数据同步给只读缓存,以保证数据的实时性。
  3. 当 eureka client 获取注册表信息的时候,server端首先去只读缓存获取。如果获取不到数据则从读写缓存获取,然后将获取到的数据同步给只读缓存并返回。
// 读写缓存
private final LoadingCache<Key, Value> readWriteCacheMap;
// 只读缓存
private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();

// ...

// 读写缓存创建
this.readWriteCacheMap = CacheBuilder.newBuilder().initialCapacity(...)
    // 过期时间设置
    .expireAfterWrite(...)
    // 移除监听
    .removalListener(...)
    .build(new CacheLoader<Key, Value>() {
        @Override
        public Value load(Key key) {
            // 数据过期或者没有值时将重新获取注册信息
            Value value = generatePayload(key);
            return value;
        }
    }
});

// 定时任务
return new TimerTask() {
    @Override
    public void run() {
        // 数据同步
        Value cacheValue = readWriteCacheMap.get(key);
        Value currentCacheValue = readOnlyCacheMap.get(key);
        if (cacheValue != currentCacheValue) {
            // 数据不同则更新
            readOnlyCacheMap.put(key, cacheValue);
        }
    }
}

多级缓存过期机制

多级缓存的过期方式有三种,包括主动过期、定时过期和被动过期。主动过期发生在服务注册的时候,如果注册的服务之前有数据在缓存中,则主动过期掉,缓存组件下次再去注册表获取最新数据。定时过期就是指定Guava CacheBuilder的定时过期时间,默认是180s。被动过期就是上面的TimerTask做的事情。

// 1. 主动过期
// 在服务注册的时候注册组件会主动调用过期方法
public void register(...) {
    // do register
    
    // 过期缓存,将缓存中的数据过期掉,在获取缓存的时候重新去注册表拉取最新数据
    invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
}
readWriteCacheMap.invalidate(key);

// ...
// 缓存组件responseCache过期方法
responseCache.invalidate(appName, vipAddress, secureVipAddress);

增量获取&一致性Hash

前面提到在EurekaClient启动的时候会初始化三个调度任务,其中cacheRefreshTask就是会每隔30s去获取最新的注册表信息,当获取注册信息的方式是增量获取[Delta]的的时候。client 首先将增量信息同步到本地,然后比较当前本地的信息是否与 server 端的数据一致。比较的方式便是一致性Hash算法。每次增量获取的Applications[包装了所有的注册信息],都会带上server端的hash值,该hash值是通过注册表中所有注册信息计算出来的具有唯一性。当 client 端同步完数据后通过计算hash值与server端的hash值进行比对就能确认信息的完整性。如果发现不一致则全量获取最新数据。核心代码如下:

// 1、定时任务获取增量注册表信息
class CacheRefreshThread implements Runnable {
    public void run() {
        refreshRegistry();
    }
}
void refreshRegistry() {
    fetchRegistry(remoteRegionsModified);
}

// 2、Delta 增量
getAndUpdateDelta(applications);
// getDelta方法与获取注册表类似
eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());

// 3、更新applications
if (fetchRegistryUpdateLock.tryLock()) {
    try {        
    updateDelta(delta);
    // 计算hash值
    reconcileHashCode = getReconcileHashCode(applications);
    } finally {
        fetchRegistryUpdateLock.unlock();        
    }
                
// 4、比较hash值
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
    // 5、不匹配则重新全量获取
    reconcileAndLogDifference(delta, reconcileHashCode);              
}

心跳机制

心跳机制即服务实例提供方告知注册中心我当前还在提供服务。心跳续约与服务注册的处理流程差不多,首先是client发送sendHeartBeat的HTTP的请求,然后server的InstanceResource的接收请求,转交给注册组件register处理续约。处理方法很简单主要是将注册表中需要续约的服务实例拿出来将续约信息中的最近更新时间戳更新一下,然后再将更新的信息同步给其他集群节点。

// client端发送请求[默认30秒发一次]

httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);

// server端处理请求

@PUT
public Response renewLease(...) {
    boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
}
// ...
// 续约信息更新
super.renew(appName, id, isReplication)
leaseToRenew.renew();
// weird!
lastUpdateTimestamp = System.currentTimeMillis() + duration;
// 集群节点同步
replicateToPeers(Action.Heartbeat, appName, id, null, null, 2022年3月19日);

服务下线&实例摘除机制

服务信息相关的操作大致上都差不多:client 通过Jersey框架发起HTTP请求,server 接收请求交给注册组件[Registry]处理注册表信息。处理完本地数据后同步给其他集群节点。与心跳机制更新时间戳的方式类似,服务下线的本质也是设置了一个服务下线的时间evictionTimestamp。最后再过期掉缓存中的数据。

// server 端
// InstanceResource.java
@DELETE
public Response cancelLease(...) {
    registry.cancel(app.getName(), id,
                "true".equals(isReplication));
}

protected boolean internalCancel(...) {
    // 1、从注册表中移除
    Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
    leaseToCancel = gMap.remove(id);
    
    // 2、leaseToCancel.cancel();
    leaseToCancel.cancel();
    internalCancel(appName, id, false);
    evictionTimestamp = System.currentTimeMillis();
    
    // 3、过期缓存
    invalidateCache(appName, vip, svip);
}

// 4、集群同步
replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);

服务实例摘除的依据就是evictionTimestamp。每次摘除的数量是有限制的注册数 - (注册数 * 0.85),当然可以配置。这样做的好处是在自我保护机制生效之前不至于清除所有的服务实例。

registry.openForTraffic(applicationInfoManager, registryCount);

// 1、每隔60s会调度定时任务
// EvictionTask
public void run() {
    // 计算一下补偿时间
    // 补偿时间为系统时钟偏斜+GC的时间
    long compensationTimeMs = getCompensationTimeMs();
    evict(compensationTimeMs);
}

evict(long additionalLeaseMs) {
    // 2、判断是否过期,并收集过期实例
    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
        expiredLeases.add(lease);
    }
    
    // 设置摘除实例数阈值:注册数 - (注册数 * 0.85)
    int toEvict = Math.min(expiredLeases.size(), evictionLimit);
    
    // 3、公平洗牌算法摘除过期实例,均匀摘除
    Random random = new Random(System.currentTimeMillis());
    for (int i = 0; i < toEvict; i++) {
        // Pick a random item (Knuth shuffle algorithm) 公平洗牌算法
        int next = i + random.nextInt(expiredLeases.size() - i);
        Collections.swap(expiredLeases, i, next);
        Lease<InstanceInfo> lease = expiredLeases.get(i);
        
        // 4、调用上面的服务摘除方法
        internalCancel(appName, id, false);
    }
}

过期检查,过期的依据满足下面两点的任意一种情况:

  • 主动下线,evictionTimestamp已经设置。
  • 应该发心跳续约的时间段没有发。
public boolean isExpired(long additionalLeaseMs) {
    // 最近更新时间+配置的续约间隔时间`duration`+补偿时间[系统时钟偏斜+GC的时间]
    // 例如: lastUpdateTimestamp = 01:00:00 , 补偿时间5s
    // 那么: 01:00:00 + 90s + 5s = 01:01:35 早于当前时间就算过期
    // 默认的续约时间间是30s,那么就是三次没有发送心跳了。
    return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
    }

自动故障感知&自我保护机制

当 eureka server 在定时调度过期任务的时候,发现过期的数量超过一定比例时[大量服务没有发送心跳过来]就会认为是自己的网络出现了故障,这个时候服务器停止摘除任何服务实例,这就是自我保护机制。触发的算法代码如下:

@Override
public boolean isLeaseExpirationEnabled() {
    // 1、配置文件是否启用了自我保护模式
    if (!isSelfPreservationModeEnabled()) { 
            return true;
    }
    // 2.1 续约数量为0开启自我保护
    // 2.1 每分钟续约数量大于0但小于期望的数据开启自我保护
    return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}

// 计算numberOfRenewsPerMinThreshold
// 期望的注册数 * (60.0/30.0) * 0.85
this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
                * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
                * serverConfig.getRenewalPercentThreshold());
// 服务注册时[register] 期望的注册数++
// 服务下显示[cancel]   期望的注册数--

// 计算每分钟续约数量
@Override
public long getNumOfRenewsInLastMin() {
    return renewsLastMin.getCount();
}
// renewsLastMin.getCount()
public long getCount() {
    return lastBucket.get();
}
// 定时调度任务,一分钟执行一次
// cas 操作
// 老数据放到lastBucket,新数据设置为0
lastBucket.set(currentBucket.getAndSet(0));

// renewsLastMin 数量增加,每次续约的时候就调用一次
public void increment() {
    currentBucket.incrementAndGet();
}

集群同步&三层队列批量处理

集群处理的管理工具类是PeerEurekaNodes。管理工具对象管理所有的集群节点信息同步。每一个PeerEurekaNode的对象代表一个集群节点,PeerEurekaNode对象的replicationClient组件处理信息同步的网络请求。

Eureka_003_任务批处理.png

总结

Eureka 整体架构比较清晰,代码相对精简。部分地方杂糅了一些Amazon的代码,不是特别优雅。功能上,服务注册的网络交互采用的HTTP协议,理解和使用起来都相对简单。接口式的获取配置信息相对优雅,可配置参数相当丰富。注册表多级缓存的机制使读写分离,也可以看出 eureka 实现的是最终一致性。eureka 集群节点是平等的[非主从式],部分节点出现故障剩余的节点依然可以提供服务,这样则实现了高可用。集群同步则是使用的批量操作的方式,减少了网络传输。eureka 使用大量的调度任务及并发容器来动态处理核心数据,确保了并发情况下数据访问的安全性。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0