上一篇文章我们介绍了eureka client的工作模式,我们知道eureka client是通过http rest来与eureka server交互,来注册服务,续约服务,取消服务,服务查询。所以,eureka server肯定要提供上述http的服务端实现,这也是我们本篇文章研究eureka server的一个切入点。另外我们还要关注下,server是怎么剔除一些长时间没有发送心跳的client端,server集群之间是怎么replicate的也是我们需要关注的一个重点。
server端提供的rest接口
server端提供给client端的rest接口主要包括注册服务,续约服务,取消服务,获取服务,下面我们会一个接一个的看下server端是什么实现这些接口的
server端启动注册rest接口
我们看下server端启动源码
@EnableEurekaServer
@SpringBootApplication
public class EurekaServerApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaServerApplication.class, args);
}
}
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerConfiguration.class)
public @interface EnableEurekaServer {
}
@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@EnableDiscoveryClient
@EnableConfigurationProperties(EurekaDashboardProperties.class)
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerConfiguration extends WebMvcConfigurerAdapter {
/**
* Register the Jersey filter
*/
@Bean
public FilterRegistrationBean jerseyFilterRegistration(
javax.ws.rs.core.Application eurekaJerseyApp) {
FilterRegistrationBean bean = new FilterRegistrationBean();
bean.setFilter(new ServletContainer(eurekaJerseyApp));
bean.setOrder(Ordered.LOWEST_PRECEDENCE);
bean.setUrlPatterns(
Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
return bean;
}
/**
* Construct a Jersey {@link javax.ws.rs.core.Application} with all the resources
* required by the Eureka server.
*/
@Bean
public javax.ws.rs.core.Application jerseyApplication(Environment environment,
ResourceLoader resourceLoader) {
ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
false, environment);
// Filter to include only classes that have a particular annotation.
//
provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
// Find classes in Eureka packages (or subpackages)
//
Set<Class<?>> classes = new HashSet<Class<?>>();
for (String basePackage : EUREKA_PACKAGES) {
Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
for (BeanDefinition bd : beans) {
Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),
resourceLoader.getClassLoader());
classes.add(cls);
}
}
// Construct the Jersey ResourceConfig
//
Map<String, Object> propsAndFeatures = new HashMap<String, Object>();
propsAndFeatures.put(
// Skip static content used by the webapp
ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");
DefaultResourceConfig rc = new DefaultResourceConfig(classes);
rc.setPropertiesAndFeatures(propsAndFeatures);
return rc;
}
}
和client端启动一样,重点关注@EnableEurekaServer,这个注解import了EurekaServerConfiguration配置类。EurekaServerConfiguration这个配置类有两个方法,jerseyFilterRegistration方法用来注册jersey fliter,jerseyApplication方法会加载下面这个类
之列resources类可以理解成spring mvc中的controller,用来接收client端的rest请求的,下面就逐个分析这些rest请求。
注册服务(register)
时序图如下:
可以看到ApplicationResource接收到client端的注册请求后,调用PeerAwareInstanceRegistryImpl的register方法,该方法先调用父类register方法将client端传过来的实例注册到自身,然后再同步给其他节点
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
......
super.register(info, leaseDuration, isReplication);
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
AbstractInstanceRegistry的register方法是注册服务的真正实现,具体源码不看了,主要是通过各种锁(ReentrantReadWriteLock,synchronized)来获取待注册实例,然后保存到一个二维map
第一层map的key是应用的名称(appName),value是该应用下面所有实例组成的一个map。(比如user这个应用,生产环境肯定有多台user应用)
第二层map的key是单个应用的实例id(全集群唯一确定),value是应用info
然后看下replicateToPeers这个方法,这个方法在以下几个条件,就提前return:
- 集群中没有其他server,显然不需要replicate
- 轮训出来的节点是自身,显然也不需要通知自己
- isReplication为true,首先我们要清楚,client发起rest请求只会到一台server上,client发出的rest请求header里面isReplication为false,所以接收到client请求的某台server就会把这个信息同步给其他其他server,也是调用相同的rest接口请求其他server,只不过此时会把hader参数isReplication设置为true,就可以避免通知已经注册过的节点
private void replicateToPeers(Action action, String appName, String id,InstanceInfo info /* optional */,InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
续约服务(renew)
同上面注册服务过程基本一样,直接画出时序图
取消服务(cancel)
同上面注册服务过程基本一样,直接画出时序图
服务列表查询(fetch registry)
比较简单,直接画出时序图
定时任务剔除失效服务(Eviction)
时序图如下:
Eviction(失效服务剔除)用来定期在Eureka Server检测失效的服务,检测标准就是超过一定时间没有Renew的服务。
默认失效时间为90秒,也就是如果有服务超过90秒没有向Eureka Server发起Renew请求的话,就会被当做失效服务剔除掉。
失效时间可以通过eureka.instance.leaseExpirationDurationInSeconds进行配置,默认90s(这个时间设置太大的话,有可能客户端会获得已经失效的服务,设置太小的话,由于网络原因,客户端心跳没有及时发过来导致被剔除。这个时间最少应该设置大于客户端心跳发送的时间)定期扫描时间可以通过eureka.server.evictionIntervalTimerInMs进行配置,默认60s。
新server加入server集群
最后再来看一下一个新的Eureka Server节点加进来,或者Eureka Server重启后,如何来做初始化,从而能够正常提供服务,整体时序图如下:
@Path("batch")
@POST
public Response batchReplication(ReplicationList replicationList) {
for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
batchResponse.addResponse(dispatch(instanceInfo));
}
}
private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) {
switch (instanceInfo.getAction()) {
case Register:
singleResponseBuilder = handleRegister(instanceInfo, applicationResource);
break;
case Heartbeat:
singleResponseBuilder = handleHeartbeat(resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);
break;
case Cancel:
singleResponseBuilder = handleCancel(resource);
break;
case StatusUpdate:
singleResponseBuilder = handleStatusUpdate(instanceInfo, resource);
break;
case DeleteStatusOverride:
singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource);
break;
}
return singleResponseBuilder.build();
}
上面PeerReplicationResource这个类作为一个jersey resource会定时接收其他server同步过来的信息(debug调试可以知道包括其他server,或者client端的Register,Heartbeat,Cancel,StatusUpdate,DeleteStatusUpdate),然后调用dispatch方法,根据不同类型处理不同的事件。所以新的server一启动,其他server就会把已经注册的服务实例传送过来,然后对每一个实例调用上面的服务注册(register)方法。