Dubbo源码学习二(服务注册)


我们知道Dubbo逻辑上由以下几个模块组成(不考虑Monitor和Container)

  • Provider:服务提供方
  • Consumer:服务消费方
  • Registry:服务注册与发现的注册中心

稍微解释下Provider和Consumer只是Dubbo逻辑上的概念,对于一个业务应用,很有可能作为Provider提供服务,又作为Consumer消费服务。
Registry是服务注册中心,业内一般都用zk作为注册中心。服务提供方在启动时候把本机的ip和提供服务的名称注册到注册中心。节点名称是服务名称,节点内容是机器ip。这样如果有多个服务提供者,节点内容就会是机器ip列表。服务消费者在启动时候向注册中心订阅自己需要的服务名称,注册中心返回提供该服务的一些机器ip列表。消费者拿到ip列表根据一定的路由策略选择一台机器进行远程调用。当然服务消费者和提供者都必须与注册中心保持一个长连接,如果服务提供方宕机,注册中心会把该机器从ip列表中摘掉,并通知消费者更新服务ip列表。

本文我们主要来看下服务提供方是如何将服务注册到注册中心的。

provider demo

public interface DemoService {
    String sayHello(String name);
}

public class DemoServiceImpl implements DemoService {
    @Override
    public String sayHello(String name) {
        System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress());
        return "Hello " + name + ", response from provider: " + RpcContext.getContext().getLocalAddress();
    }
}
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
       http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

    <dubbo:application name="demo-provider"/>
    <dubbo:registry address="zookeeper://localhost:2181?client=curator"/>
    <dubbo:protocol name="dubbo" port="20880"/>
    <bean id="demoService" class="org.apache.dubbo.demo.provider.DemoServiceImpl"/>
    <dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService"/>
</beans>

上述DemoService是一个我们声明的服务,DemoServiceImpl是其实现,这里我们通过xml配置文件来注册这个服务,当然Dubbo也支持通过注解过着api方法本文不讨论。这个配置使用了spring自定义的schema,声明了applicationName,注册中心是zk,通信协议是dubbo,最重要的是dubbo:service这个标签。这里我们必须知道实现自定义schema需要以下几个文件

public class DubboNamespaceHandler extends NamespaceHandlerSupport {
    static {
        Version.checkDuplicate(DubboNamespaceHandler.class);
    }
    @Override
    public void init() {
        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
        registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
    }
}

spring.handlers

http\://dubbo.apache.org/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
http\://code.alibabatech.com/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler

spring.schemas

http\://dubbo.apache.org/schema/dubbo/dubbo.xsd=META-INF/dubbo.xsd
http\://code.alibabatech.com/schema/dubbo/dubbo.xsd=META-INF/compat/dubbo.xsd

dubbo.xsd(有省略)

<xsd:element name="service" type="serviceType">
    <xsd:annotation>
        <xsd:documentation><![CDATA[ Export service config ]]></xsd:documentation>
        <xsd:appinfo>
            <tool:annotation>
                <tool:exports type="org.apache.dubbo.config.ServiceConfig"/>
            </tool:annotation>
        </xsd:appinfo>
    </xsd:annotation>
</xsd:element>

这里大家有兴趣可以自己去找资料学习自定义schema。注意

registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));

<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService"/>

上述这个你可以认为把xml的dubbo:service标签定义的bean解析为一个
spring ServiceBean,这里多说一句,显然作为provider你可以通过xml配置多个dubbo:service服务,这样就会对应生成多个ServiceBean

ServiceBean

public class ServiceBean<T> extends ServiceConfig<T> implements ApplicationListener<ContextRefreshedEvent> {
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (isDelay() && !isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            export();
        }
    }
}

spring容器加载完成后,会回调onApplicationEvent方法,会调用到父类ServiceConfig的export()方法,export方法会调用自身的doExport()方法,这个方法会去判断ref是否泛化调用GenericService,是否定义了本地存根stub方法等。接着doExportUrls()

private void doExportUrls() {
    List<URL> registryURLs = loadRegistries(true);
    for (ProtocolConfig protocolConfig : protocols) {
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

一个ServiceBean本身可以支持多种通信协议,会分别进行处理。这里我们xml只配置了dubbo协议,就只会处理dubbo协议。接着调用
doExportUrlsFor1Protocol方法。这个方法主要做了以下几件事

  • 判断dubbo:service标签有没有设置子标签dubbo:method,意思是可以针对单个服务的不同方法配置不同的参数,比如超时时间之类的。下面这个ServiceBean的uml类图也可说明dubbo:servcie的配置的控制粒度,从粗粒度的service级别到细粒度的method级别。
  • 判断该服务是本地服务还是远程服务,dubbo:service标签有个scope属性,可以配置该服务只提供本地服务InjvmProtocol(类似一个普通的spring service),也可以配置提供远程调用服务,这样就需要注册到注册中心。默认情况下,服务本地和远程都会注册。

本文只考虑远程服务,本地服务是怎么注册的可以看下InjvmProtocol。

Exporter<?> exporter = protocol.export(wrapperInvoker);

registry=org.apache.dubbo.registry.integration.RegistryProtocol

上面的protocol是Protocol$Adaptive类,前一篇讲ExtensionLoader的时候讲到过,这个可以根据
wrapperInvoker.getUrl().getProtocol来真正决定调用哪个Protocol,这里返回的是registry,那么调用的就是RegistryProtocol的export方法。(严格上来说上述过程省略了两个包装类ProtocolListenerWrapper,ProtocolFilterWrapper)

RegistryProtocol

RegistryProtocol

@Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //export invoker
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

    URL registryUrl = getRegistryUrl(originInvoker);

    //registry provider
    final Registry registry = getRegistry(originInvoker);
    final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);

    //to judge to delay publish whether or not
    boolean register = registedProviderUrl.getParameter("register", true);

    ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);

    if (register) {
        register(registryUrl, registedProviderUrl);
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    }

    // Subscribe the override data
    // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    //Ensure that a new exporter instance is returned every time export
    return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
}

上述方法主要包含3个流程

  • 公开本地服务,配置文件配置的是Dubbo通信协议,实际上会使用Netty4作为通信协议,暴露20880端口并监听,接受客户端传来的各种请求
  • 向注册中心注册该服务,最终向zk
    /dubbo/org.apache.dubbo.demo.DemoService/providers 目录下写入URL地址(其中包括机器等信息)
  • Subscribe the override data,这个过程貌似会向路径
    /dubbo/org.apache.dubbo.demo.DemoService/configurators路径写入URL地址

上述我们主要关注前两个过程。

doLocalExport(开启本地服务)

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
    String key = getCacheKey(originInvoker);
    ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
    if (exporter == null) {
        synchronized (bounds) {
            exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                bounds.put(key, exporter);
            }
        }
    }
    return exporter;
}

这里又是根据invokerDelegete.getUrl.getProtocol获得了dubbo,最终会调用DubboProtocol的export方法。

DubboProtocol

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();

    // export service.
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);

    //export an stub service for dispatching event
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice) {
        String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            if (logger.isWarnEnabled()) {
                logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }
        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }

    openServer(url);
    optimizeSerialization(url);
    return exporter;
}

上述openServer(url)方法会调用到createServer方法()

private ExchangeServer createServer(URL url) {
    // send readonly event when server closes, it's enabled by default
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
    // enable heartbeat by default
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);

    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }
    return server;
}

上述Exchangers.bind(url, requestHandler);方法经过
HeaderExchanger.bind()==>Transporters.bind==>
NettyTransporter.bind()等最终到NettyServer.doOpen方法

protected void doOpen() throws Throwable {
    bootstrap = new ServerBootstrap();

    bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            new DefaultThreadFactory("NettyServerWorker", true));

    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    channels = nettyServerHandler.getChannels();

    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                            .addLast("handler", nettyServerHandler);
                }
            });
    // bind
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();

}

上述过程我们在netty源码研究一(服务端启动)已经讲过,有兴趣可以看下。主要是服务端通过netty暴露20880端口并持续监听,也就是接受客户端发来的各种请求。这样就完成了Provider本地服务的注册。

registry provider(向注册中心注册服务)

上面确认服务已经开启后,下面就要向注册中心注册服务了。

RegistryProtocol

public void register(URL registryUrl, URL registedProviderUrl) {
    Registry registry = registryFactory.getRegistry(registryUrl);
    registry.register(registedProviderUrl);
}

上述方法经过FailbackRegistry.register,最终会调用到ZookeeperRegistry的doRegister(url)方法

@Override
protected void doRegister(URL url) {
    try {
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

上述方法就是向zk /dubbo/org.apache.dubbo.demo.DemoService/providers 路径下写入本机的ip地址,这样就完成了服务注册。

总结

Dubbo实现服务注册的代码,各种Protocol$Adaptive,一眼看上去感觉绕来绕去,当然设计成这样也是为了以后更好的去通过插件式的扩展。我们可以先跳出上述代码,其实服务注册大体也就两个过程:

  • 本地首先要启动服务,这里一般都是通过Netty开启一个端口并监听,这样就可以持续监听客户端发来的请求。
  • 本地服务启动后,就向注册中心注册该服务,目前业内多用zk作为注册中心。

文章作者: 叶明
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 叶明 !
评论
 上一篇
Dubbo源码学习三(服务引用) Dubbo源码学习三(服务引用)
我们知道Dubbo逻辑上由以下几个模块组成(不考虑Monitor和Container) Provider:服务提供方 Consumer:服务消费方 Registry:服务注册与发现的注册中心 上一篇文章Dubbo源码学习二(服务注册
2018-07-29
下一篇 
Dubbo源码学习一(扩展点ExtensionLoader) Dubbo源码学习一(扩展点ExtensionLoader)
之前其实已经粗略看过Dubbo的源码,最近一个月在公司连续写了nodejs和go语言的redis cluster客户端之后,稍微有点空余时间就准备找个开源的java项目学习下。正好阿里准备对Dubbo重启开发3.0,就决定重新学习下Dubb
2018-07-25
  目录