spring-cloud-eureka源码分析一(client端)


what is eureka

先介绍下eureka是什么玩意。eureka是netflix公司开源的一款基于REST的服务自动注册和发现的产品,并且提供了java客户端给eureka client端更好的与eureka server交互。另外不得不提一下netflix这个公司,这是一家在线影片租赁商,开源了很多java的产品,其中很多产品都被收入进了spring cloud项目里面。除了上面说的eureka,还有处理熔断的hystrix,提供客户端负载均衡的ribbon,无线网关的zuul等等。下面先看一下eureka的整体架构图
eureka架构图

可以看到eueka按逻辑上可以划分为3个模块,eureka-server,eureka-client-service-provider,eureka-client-service-consumer。

  • eureka-server:服务端,提供服务注册和发现
  • eureka-client-service-provider:客户端,服务提供者,通过http rest告知服务端注册,更新,取消服务
  • eureka-client-service-consumer:客户端,服务消费者,通过http rest从服务端获取需要服务的地址列表,然后配合一些负载均衡策略(ribbon)来调用服务端服务。

值得注意的一点,不同于其他服务注册与发现(zookeeper,dubbo等其他产品需要单独以中间件的形式部署集群server),以上3个角色都是逻辑角色,甚至可以在相同的jvm进程上。@EnableDiscoveryClient注解从字面意思就可以猜到这个和服务注册发现有关,下面我们就来看下源码。

client应用启动

下面是基于spring cloud的eureka的client端启动代码

@EnableDiscoveryClient
@SpringBootApplication
public class EurekaProviderApplication {

    public static void main(String[] args) {
        SpringApplication.run(EurekaProviderApplication.class,
            args);
    }
}

可以看到基于spring boot的启动配置相当简单,这里不详细介绍spring boot了,只需知道@SpringBootApplication这个注解开启了spring boot的组件扫描和自动配置功能,SpringApplication.run方法负责启动引导应用程序。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {
}

@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableDiscoveryClientImportSelector
		extends SpringFactoryImportSelector<EnableDiscoveryClient> {
		.......
}

@CommonsLog
public abstract class SpringFactoryImportSelector<T>
		implements DeferredImportSelector, BeanClassLoaderAware, EnvironmentAware {
	@Override
	public String[] selectImports(AnnotationMetadata metadata){
		最终返回一个["org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration"]一元数组
	}
}

其实从这个注解EnableDiscoveryClient也不难看出最终会注入DiscoveryClient,这个调用过程链异常复杂,有兴趣的可以自己debug看下调用方法堆栈。

spring cloud DiscoveryClient

其中左边的org.springframework.cloud.client.discovery.DiscoveryClient是spring cloud定义的接口,它定义了发现服务的抽象方法。
下面的EurekaDiscoveryClient是它的实现类。
右边的LookupService是netflix开源的发现服务的接口。
最终实现是com.netflix.discovery.DiscoveryClient

大家可以发现中间有个1:1的线把左右两个部分关联起来了,我们如果仔细研究spring cloud的一些源码,可以发现很多这种设计模式。spring cloud自己包装了很多wrapper类,最终实现其实都是那些开源产品自己的类。大家看下面的EurekaDiscoveryClient的源码就知道了

    public class EurekaDiscoveryClient implements DiscoveryClient {

	public static final String DESCRIPTION = "Spring Cloud Eureka Discovery Client";

	private final EurekaInstanceConfig config;

	private final EurekaClient eurekaClient;

	@Override
	public List<ServiceInstance> getInstances(String serviceId) {
		List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId,
				false);
		List<ServiceInstance> instances = new ArrayList<>();
		for (InstanceInfo info : infos) {
			instances.add(new EurekaServiceInstance(info));
		}
		return instances;
	}

	@Override
	public List<String> getServices() {
		Applications applications = this.eurekaClient.getApplications();
		if (applications == null) {
			return Collections.emptyList();
		}
		List<Application> registered = applications.getRegisteredApplications();
		List<String> names = new ArrayList<>();
		for (Application app : registered) {
			if (app.getInstances().isEmpty()) {
				continue;
			}
			names.add(app.getName().toLowerCase());

		}
		return names;
	}

}

显然EurekaDiscoveryClient获取服务最终会通过它的成员变量private final EurekaClient eurekaClient(也就是那条线右边的接口)来真正的调用服务端接口获取服务。

netflix DiscoveryClient

先看下DiscoveryClient的构造方法

@Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, DiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider) {
        ......
        try {
            scheduler = Executors.newScheduledThreadPool(3,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());

            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

            eurekaTransport = new EurekaTransport();
            scheduleServerEndpointTask(eurekaTransport, args);
            if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            fetchRegistryFromBackup();
        }
            initScheduledTasks();
        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register timers", e);
        }

可以看到这个构造方法里面,主要做了下面几件事

  • 创建了scheduler定时任务的线程池,heartbeatExecutor心跳检查线程池(服务续约),cacheRefreshExecutor(服务获取)
  • 然后initScheduledTasks()开启上面三个线程池,往上面3个线程池分别添加相应任务。然后创建了一个instanceInfoReplicator(Runnable任务),然后调用InstanceInfoReplicator.start方法,把这个任务放进上面scheduler定时任务线程池(服务注册并更新)。

服务注册(Registry)

上面说了,initScheduledTasks()方法中调用了InstanceInfoReplicator.start()方法,下面看下InstanceInfoReplicator源码

/**
 * A task for updating and replicating the local instanceinfo to the remote server. Properties of this task are:
 * - configured with a single update thread to guarantee sequential update to the remote server
 * - update tasks can be scheduled on-demand via onDemandUpdate()
 * - task processing is rate limited by burstSize
 * - a new update task is always scheduled automatically after an earlier update task. However if an on-demand task
 *   is started, the scheduled automatic update task is discarded (and a new one will be scheduled after the new
 *   on-demand update).
 *
 *   @author dliu
 */
class InstanceInfoReplicator implements Runnable {

    public void start(int initialDelayMs) {
        if (started.compareAndSet(false, true)) {
            instanceInfo.setIsDirty();  // for initial register
            Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

    public void stop() {
        scheduler.shutdownNow();
        started.set(false);
    }

    public boolean onDemandUpdate() {
        if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
            scheduler.submit(new Runnable() {
                @Override
                public void run() {
                    logger.debug("Executing on-demand update of local InstanceInfo");

                    Future latestPeriodic = scheduledPeriodicRef.get();
                    if (latestPeriodic != null && !latestPeriodic.isDone()) {
                        logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                        latestPeriodic.cancel(false);
                    }

                    InstanceInfoReplicator.this.run();
                }
            });
            return true;
        } else {
            logger.warn("Ignoring onDemand update due to rate limiter");
            return false;
        }
    }

    public void run() {
        try {
            discoveryClient.refreshInstanceInfo();

            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

}

从源码和类的注释可以看到,这个Runnable类用来注册和更新接口,被放在了一个单线程定时任务线程池中连续不断运行。除非应用状态listener监听到应用状态发生变化,会主动调用instanceInfoReplicator.onDemandUpdate();

statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
    @Override
    public String getId() {
        return "statusChangeListener";
    }

    @Override
    public void notify(StatusChangeEvent s){
        instanceInfoReplicator.onDemandUpdate();
    }
};

if (clientConfig.shouldOnDemandUpdateStatusChange()) {
    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}

接着上面InstanceInfoReplicator的run方法,run方法中会调用DiscoveryClient的register方法。

/**
     * Register with the eureka service by making the appropriate REST call.
     */
    boolean register() throws Throwable {
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }

最终又经过一系列调用,最终会调用到AbstractJerseyEurekaHttpClient的register方法

public EurekaHttpResponse<Void> register(InstanceInfo info) {
    String urlPath = "apps/" + info.getAppName();
    ClientResponse response = null;
    try {
        Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
        addExtraHeaders(resourceBuilder);
        response = resourceBuilder
                .header("Accept-Encoding", "gzip")
                .type(MediaType.APPLICATION_JSON_TYPE)
                .accept(MediaType.APPLICATION_JSON)
                .post(ClientResponse.class, info);
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                    response == null ? "N/A" : response.getStatus());
        }
        if (response != null) {
            response.close();
        }
    }
}

可以看到最终通过http rest请求eureka server端,把应用自身的InstanceInfo实例注册给server端,我们再来完整梳理一下服务注册流程。
register流程

服务续约renew

大致过程和上面类似,不再分析源码,直接画出时序图
renew

服务下线cancel

在服务shutdown的时候,需要及时通知服务端把自己剔除,以避免客户端调用已经下线的服务
cancel

定时更新已经注册服务的地址fetchRegistry

客户端会定时每隔一段时间获取远程服务地址,然后更新本地缓存
fetchRegistry

定时更新eureka server的地址

上面可以看到不管是client-service-provider,client-service-consumer都是通过http rest来与eureka-server端交互的,所以那么client端是怎么获取server端的地址的呢?默认是从配置文件里面取得,如果需要更灵活的控制,可以通过override getEurekaServerServiceUrls方法来提供自己的实现。定期更新频率可以通过eureka.client.eurekaServiceUrlPollIntervalSeconds配置
定时更新eureka server地址


文章作者: 叶明
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 叶明 !
评论
 上一篇
spring-cloud-eureka源码分析二(server端) spring-cloud-eureka源码分析二(server端)
上一篇文章我们介绍了eureka client的工作模式,我们知道eureka client是通过http rest来与eureka server交互,来注册服务,续约服务,取消服务,服务查询。所以,eureka server肯定要提供上述
2016-12-02
下一篇 
Nodejs简介 Nodejs简介
是什么Nodejs是一个为实时web应用开发而诞生的平台,它从诞生之初就考虑了在实时响应,超大规模数据要求下架构的可扩展性。它摒弃了传统平台依靠多线程来实现高并发,采用了单线程、异步式io、事件驱动式的程序设计模型(听起来是不是感觉很高大上
2016-07-03
  目录