spring-cloud-eureka源码分析二(server端)


上一篇文章我们介绍了eureka client的工作模式,我们知道eureka client是通过http rest来与eureka server交互,来注册服务,续约服务,取消服务,服务查询。所以,eureka server肯定要提供上述http的服务端实现,这也是我们本篇文章研究eureka server的一个切入点。另外我们还要关注下,server是怎么剔除一些长时间没有发送心跳的client端,server集群之间是怎么replicate的也是我们需要关注的一个重点。

server端提供的rest接口

server端提供给client端的rest接口主要包括注册服务,续约服务,取消服务,获取服务,下面我们会一个接一个的看下server端是什么实现这些接口的

server端启动注册rest接口

我们看下server端启动源码

@EnableEurekaServer
@SpringBootApplication
public class EurekaServerApplication {
    public static void main(String[] args) {
        SpringApplication.run(EurekaServerApplication.class, args);
    }
}

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerConfiguration.class)
public @interface EnableEurekaServer {
}

@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@EnableDiscoveryClient
@EnableConfigurationProperties(EurekaDashboardProperties.class)
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerConfiguration extends WebMvcConfigurerAdapter {
	/**
	 * Register the Jersey filter
	 */
	@Bean
	public FilterRegistrationBean jerseyFilterRegistration(
			javax.ws.rs.core.Application eurekaJerseyApp) {
		FilterRegistrationBean bean = new FilterRegistrationBean();
		bean.setFilter(new ServletContainer(eurekaJerseyApp));
		bean.setOrder(Ordered.LOWEST_PRECEDENCE);
		bean.setUrlPatterns(
				Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));

		return bean;
	}

	/**
	 * Construct a Jersey {@link javax.ws.rs.core.Application} with all the resources
	 * required by the Eureka server.
	 */
	@Bean
	public javax.ws.rs.core.Application jerseyApplication(Environment environment,
			ResourceLoader resourceLoader) {

		ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
				false, environment);

		// Filter to include only classes that have a particular annotation.
		//
		provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
		provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));

		// Find classes in Eureka packages (or subpackages)
		//
		Set<Class<?>> classes = new HashSet<Class<?>>();
		for (String basePackage : EUREKA_PACKAGES) {
			Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
			for (BeanDefinition bd : beans) {
				Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),
						resourceLoader.getClassLoader());
				classes.add(cls);
			}
		}

		// Construct the Jersey ResourceConfig
		//
		Map<String, Object> propsAndFeatures = new HashMap<String, Object>();
		propsAndFeatures.put(
				// Skip static content used by the webapp
				ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
				EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");

		DefaultResourceConfig rc = new DefaultResourceConfig(classes);
		rc.setPropertiesAndFeatures(propsAndFeatures);

		return rc;
	}
}

和client端启动一样,重点关注@EnableEurekaServer,这个注解import了EurekaServerConfiguration配置类。EurekaServerConfiguration这个配置类有两个方法,jerseyFilterRegistration方法用来注册jersey fliter,jerseyApplication方法会加载下面这个类
jersery resources
之列resources类可以理解成spring mvc中的controller,用来接收client端的rest请求的,下面就逐个分析这些rest请求。

注册服务(register)

时序图如下:
注册服务时序图

可以看到ApplicationResource接收到client端的注册请求后,调用PeerAwareInstanceRegistryImpl的register方法,该方法先调用父类register方法将client端传过来的实例注册到自身,然后再同步给其他节点

@Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        ......
        super.register(info, leaseDuration, isReplication);
        replicateToPeers(Action.Register, info.getAppName(),   info.getId(), info, null, isReplication);
    }    

AbstractInstanceRegistry的register方法是注册服务的真正实现,具体源码不看了,主要是通过各种锁(ReentrantReadWriteLock,synchronized)来获取待注册实例,然后保存到一个二维map
第一层map的key是应用的名称(appName),value是该应用下面所有实例组成的一个map。(比如user这个应用,生产环境肯定有多台user应用)
第二层map的key是单个应用的实例id(全集群唯一确定),value是应用info

然后看下replicateToPeers这个方法,这个方法在以下几个条件,就提前return:

  • 集群中没有其他server,显然不需要replicate
  • 轮训出来的节点是自身,显然也不需要通知自己
  • isReplication为true,首先我们要清楚,client发起rest请求只会到一台server上,client发出的rest请求header里面isReplication为false,所以接收到client请求的某台server就会把这个信息同步给其他其他server,也是调用相同的rest接口请求其他server,只不过此时会把hader参数isReplication设置为true,就可以避免通知已经注册过的节点
private void replicateToPeers(Action action, String appName, String id,InstanceInfo info /* optional */,InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }

续约服务(renew)

同上面注册服务过程基本一样,直接画出时序图
renew

取消服务(cancel)

同上面注册服务过程基本一样,直接画出时序图
renew

服务列表查询(fetch registry)

比较简单,直接画出时序图
renew

定时任务剔除失效服务(Eviction)

时序图如下:
Eviction

Eviction(失效服务剔除)用来定期在Eureka Server检测失效的服务,检测标准就是超过一定时间没有Renew的服务。

默认失效时间为90秒,也就是如果有服务超过90秒没有向Eureka Server发起Renew请求的话,就会被当做失效服务剔除掉。

失效时间可以通过eureka.instance.leaseExpirationDurationInSeconds进行配置,默认90s(这个时间设置太大的话,有可能客户端会获得已经失效的服务,设置太小的话,由于网络原因,客户端心跳没有及时发过来导致被剔除。这个时间最少应该设置大于客户端心跳发送的时间)定期扫描时间可以通过eureka.server.evictionIntervalTimerInMs进行配置,默认60s。

新server加入server集群

最后再来看一下一个新的Eureka Server节点加进来,或者Eureka Server重启后,如何来做初始化,从而能够正常提供服务,整体时序图如下:
新server

@Path("batch")
@POST
public Response batchReplication(ReplicationList replicationList) {
    for (ReplicationInstance instanceInfo :  replicationList.getReplicationList()) {
         batchResponse.addResponse(dispatch(instanceInfo));
    }
}

private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) {
        switch (instanceInfo.getAction()) {
            case Register:
                singleResponseBuilder = handleRegister(instanceInfo, applicationResource);
                break;
            case Heartbeat:
                singleResponseBuilder = handleHeartbeat(resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);
                break;
            case Cancel:
                singleResponseBuilder = handleCancel(resource);
                break;
            case StatusUpdate:
                singleResponseBuilder = handleStatusUpdate(instanceInfo, resource);
                break;
            case DeleteStatusOverride:
                singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource);
                break;
        }
        return singleResponseBuilder.build();
    }

上面PeerReplicationResource这个类作为一个jersey resource会定时接收其他server同步过来的信息(debug调试可以知道包括其他server,或者client端的Register,Heartbeat,Cancel,StatusUpdate,DeleteStatusUpdate),然后调用dispatch方法,根据不同类型处理不同的事件。所以新的server一启动,其他server就会把已经注册的服务实例传送过来,然后对每一个实例调用上面的服务注册(register)方法。


文章作者: 叶明
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 叶明 !
评论
 上一篇
spring-kafka源码分析一(Producer) spring-kafka源码分析一(Producer)
spring-kafka是运用spring的概念基于apache kafka(linkedin开源已经捐献给apache基金会)消息解决方案开发的一个java client端。它提供了一些接口来更方便收发消息与kafka server端交互
2016-12-17
下一篇 
spring-cloud-eureka源码分析一(client端) spring-cloud-eureka源码分析一(client端)
what is eureka先介绍下eureka是什么玩意。eureka是netflix公司开源的一款基于REST的服务自动注册和发现的产品,并且提供了java客户端给eureka client端更好的与eureka server交互。另外
2016-12-01
  目录