what is eureka
先介绍下eureka是什么玩意。eureka是netflix公司开源的一款基于REST的服务自动注册和发现的产品,并且提供了java客户端给eureka client端更好的与eureka server交互。另外不得不提一下netflix这个公司,这是一家在线影片租赁商,开源了很多java的产品,其中很多产品都被收入进了spring cloud项目里面。除了上面说的eureka,还有处理熔断的hystrix,提供客户端负载均衡的ribbon,无线网关的zuul等等。下面先看一下eureka的整体架构图
可以看到eueka按逻辑上可以划分为3个模块,eureka-server,eureka-client-service-provider,eureka-client-service-consumer。
- eureka-server:服务端,提供服务注册和发现
- eureka-client-service-provider:客户端,服务提供者,通过http rest告知服务端注册,更新,取消服务
- eureka-client-service-consumer:客户端,服务消费者,通过http rest从服务端获取需要服务的地址列表,然后配合一些负载均衡策略(ribbon)来调用服务端服务。
值得注意的一点,不同于其他服务注册与发现(zookeeper,dubbo等其他产品需要单独以中间件的形式部署集群server),以上3个角色都是逻辑角色,甚至可以在相同的jvm进程上。@EnableDiscoveryClient注解从字面意思就可以猜到这个和服务注册发现有关,下面我们就来看下源码。
client应用启动
下面是基于spring cloud的eureka的client端启动代码
@EnableDiscoveryClient
@SpringBootApplication
public class EurekaProviderApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaProviderApplication.class,
args);
}
}
可以看到基于spring boot的启动配置相当简单,这里不详细介绍spring boot了,只需知道@SpringBootApplication这个注解开启了spring boot的组件扫描和自动配置功能,SpringApplication.run方法负责启动引导应用程序。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {
}
@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableDiscoveryClientImportSelector
extends SpringFactoryImportSelector<EnableDiscoveryClient> {
.......
}
@CommonsLog
public abstract class SpringFactoryImportSelector<T>
implements DeferredImportSelector, BeanClassLoaderAware, EnvironmentAware {
@Override
public String[] selectImports(AnnotationMetadata metadata){
最终返回一个["org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration"]一元数组
}
}
其实从这个注解EnableDiscoveryClient也不难看出最终会注入DiscoveryClient,这个调用过程链异常复杂,有兴趣的可以自己debug看下调用方法堆栈。
spring cloud DiscoveryClient
其中左边的org.springframework.cloud.client.discovery.DiscoveryClient是spring cloud定义的接口,它定义了发现服务的抽象方法。
下面的EurekaDiscoveryClient是它的实现类。
右边的LookupService是netflix开源的发现服务的接口。
最终实现是com.netflix.discovery.DiscoveryClient。
大家可以发现中间有个1:1的线把左右两个部分关联起来了,我们如果仔细研究spring cloud的一些源码,可以发现很多这种设计模式。spring cloud自己包装了很多wrapper类,最终实现其实都是那些开源产品自己的类。大家看下面的EurekaDiscoveryClient的源码就知道了
public class EurekaDiscoveryClient implements DiscoveryClient {
public static final String DESCRIPTION = "Spring Cloud Eureka Discovery Client";
private final EurekaInstanceConfig config;
private final EurekaClient eurekaClient;
@Override
public List<ServiceInstance> getInstances(String serviceId) {
List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId,
false);
List<ServiceInstance> instances = new ArrayList<>();
for (InstanceInfo info : infos) {
instances.add(new EurekaServiceInstance(info));
}
return instances;
}
@Override
public List<String> getServices() {
Applications applications = this.eurekaClient.getApplications();
if (applications == null) {
return Collections.emptyList();
}
List<Application> registered = applications.getRegisteredApplications();
List<String> names = new ArrayList<>();
for (Application app : registered) {
if (app.getInstances().isEmpty()) {
continue;
}
names.add(app.getName().toLowerCase());
}
return names;
}
}
显然EurekaDiscoveryClient获取服务最终会通过它的成员变量private final EurekaClient eurekaClient(也就是那条线右边的接口)来真正的调用服务端接口获取服务。
netflix DiscoveryClient
先看下DiscoveryClient的构造方法
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, DiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
......
try {
scheduler = Executors.newScheduledThreadPool(3,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
可以看到这个构造方法里面,主要做了下面几件事
- 创建了scheduler定时任务的线程池,heartbeatExecutor心跳检查线程池(服务续约),cacheRefreshExecutor(服务获取)
- 然后initScheduledTasks()开启上面三个线程池,往上面3个线程池分别添加相应任务。然后创建了一个instanceInfoReplicator(Runnable任务),然后调用InstanceInfoReplicator.start方法,把这个任务放进上面scheduler定时任务线程池(服务注册并更新)。
服务注册(Registry)
上面说了,initScheduledTasks()方法中调用了InstanceInfoReplicator.start()方法,下面看下InstanceInfoReplicator源码
/**
* A task for updating and replicating the local instanceinfo to the remote server. Properties of this task are:
* - configured with a single update thread to guarantee sequential update to the remote server
* - update tasks can be scheduled on-demand via onDemandUpdate()
* - task processing is rate limited by burstSize
* - a new update task is always scheduled automatically after an earlier update task. However if an on-demand task
* is started, the scheduled automatic update task is discarded (and a new one will be scheduled after the new
* on-demand update).
*
* @author dliu
*/
class InstanceInfoReplicator implements Runnable {
public void start(int initialDelayMs) {
if (started.compareAndSet(false, true)) {
instanceInfo.setIsDirty(); // for initial register
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
public void stop() {
scheduler.shutdownNow();
started.set(false);
}
public boolean onDemandUpdate() {
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
scheduler.submit(new Runnable() {
@Override
public void run() {
logger.debug("Executing on-demand update of local InstanceInfo");
Future latestPeriodic = scheduledPeriodicRef.get();
if (latestPeriodic != null && !latestPeriodic.isDone()) {
logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
latestPeriodic.cancel(false);
}
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
logger.warn("Ignoring onDemand update due to rate limiter");
return false;
}
}
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
}
从源码和类的注释可以看到,这个Runnable类用来注册和更新接口,被放在了一个单线程定时任务线程池中连续不断运行。除非应用状态listener监听到应用状态发生变化,会主动调用instanceInfoReplicator.onDemandUpdate();
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent s){
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
接着上面InstanceInfoReplicator的run方法,run方法中会调用DiscoveryClient的register方法。
/**
* Register with the eureka service by making the appropriate REST call.
*/
boolean register() throws Throwable {
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
最终又经过一系列调用,最终会调用到AbstractJerseyEurekaHttpClient的register方法
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
可以看到最终通过http rest请求eureka server端,把应用自身的InstanceInfo实例注册给server端,我们再来完整梳理一下服务注册流程。
服务续约renew
大致过程和上面类似,不再分析源码,直接画出时序图
服务下线cancel
在服务shutdown的时候,需要及时通知服务端把自己剔除,以避免客户端调用已经下线的服务
定时更新已经注册服务的地址fetchRegistry
客户端会定时每隔一段时间获取远程服务地址,然后更新本地缓存
定时更新eureka server的地址
上面可以看到不管是client-service-provider,client-service-consumer都是通过http rest来与eureka-server端交互的,所以那么client端是怎么获取server端的地址的呢?默认是从配置文件里面取得,如果需要更灵活的控制,可以通过override getEurekaServerServiceUrls方法来提供自己的实现。定期更新频率可以通过eureka.client.eurekaServiceUrlPollIntervalSeconds配置