Dubbo源码学习一(扩展点ExtensionLoader)


之前其实已经粗略看过Dubbo的源码,最近一个月在公司连续写了nodejs和go语言的redis cluster客户端之后,稍微有点空余时间就准备找个开源的java项目学习下。正好阿里准备对Dubbo重启开发3.0,就决定重新学习下Dubbo。

Dubbo是一款高性能、轻量级的开源Java RPC框架,它提供了三大核心能力:面向接口的远程方法调用,智能容错和负载均衡,以及服务自动注册和发现。

个人认为Dubbo之所以能让这么多人和公司关注,它的一些设计模式确实很值得学习。我们都知道java有七大设计原则,说下开闭原则OCP,对扩展开放,对修改关闭。Dubbo被那么多人和公司使用,可能每个公司都会根据自身的需求对Dubbo做些修改。Dubbo一开始就留了一些扩展点,对于一些常用的模块像注册中心,通信协议,路由等模块都提供了多种不同的实现。如果还不能满足需求的话,用户还可以自行定制。让参与者尽量黑盒扩展,而不是白盒去修改代码。

Dubbo即然要扩展,扩展点的加载方式,首先要统一,微核心+插件式,是比较能达到 OCP 原则的思路。由一个插件生命周期管理容器,构成微核心,核心不包括任何功能,这样可以确保所有功能都能被替换,并且,框架作者能做到的功能,扩展者也一定要能做到,以保证平等对待第三方,所以,框架自身的功能也要用插件的方式实现,不能有任何硬编码。

Dubbo参考了JDK标准的SPI机制,参见:java.util.ServiceLoader,开发了一套ExtensionLoader加载机制,所有需要扩展的模块都由ExtensionLoader来动态加载。

SPI(Service Provider Interface)

SPI简介

SPI的全名为Service Provider Interface。大多数开发人员可能不熟悉,因为这个是针对厂商或者插件的。例如标准的jdk JDBC,jdk本身没有实现通信协议,只是定义了一套操作数据库的接口。由不同的数据库生产厂商自己实现JDBC一套标准的接口。在面向对象的设计中,我们一般都是面对接口编程,一个接口可能有多种不同的实现,我们一般不在代码里面硬编码指定具体使用的接口实现。比如我一开始使用mysql,后来使用oracle,那么我就需要修改代码加载oracle的jdbc实现,这就违反了OCP原则。所以我们必须实现一种机制,让程序能自己寻找服务实现的机制,有点类似IOC,将对象的装配的控制权转移到程序之外。

SPI约定

当服务的提供者实现了某个接口之后,必须在META-INF/services路径下面创建一个文件名是接口全类名,文件内容是具体实现的全类名。ServiceLoader就会去扫描这个路径下面所有的文件,会加载这个实现类。

SPI demo

public static void main(String[] args) throws SQLException, ClassNotFoundException {
    //java5以上版本,不需要Class.forName("com.mysql.jdbc.Driver");
    //Class.forName("com.mysql.jdbc.Driver");
    String connectionURL = "jdbc:mysql://localhost:3306/test";
    Connection conn = DriverManager.getConnection(connectionURL, "userName", "password");
}

上述是使用jdbc连接mysql的一个demo,其实从java5版本以后,已经不需要使用Class.forName(“com.mysql.jdbc.Driver”)加载mysql驱动了。我们只要确保classpath引入了mysql-connector-java包,后面的DriverManager.getConnection会根据mysql-connector-java包里面的META-INF/services文件自动加载com.mysql.jdbc.Driver。
jdkspi

Dubbo ExtensionLoader

来源

Dubbo 的扩展点加载从 JDK 标准的 SPI (Service Provider Interface) 扩展点发现机制加强而来。
Dubbo 改进了 JDK 标准的 SPI 的以下问题:

  • JDK 标准的 SPI 会一次性实例化扩展点所有实现,如果有扩展实现初始化很耗时,但如果没用上也加载,会很浪费资源。
  • 如果扩展点加载失败,连扩展点的名称都拿不到了。比如:JDK 标准的 ScriptEngine,通过 getName() 获取脚本类型的名称,但如果 RubyScriptEngine 因为所依赖的 jruby.jar 不存在,导致 RubyScriptEngine 类加载失败,这个失败原因被吃掉了,和 ruby 对应不起来,当用户执行 ruby 脚本时,会报不支持 ruby,而不是真正失败的原因。
  • 增加了对扩展点 IoC 和 AOP 的支持,一个扩展点可以直接 setter 注入其它扩展点。

上图可以看到dubbo默认约定文件放在META-INF/dubbo/internal下面,文件名还是接口全类名,文件内容稍有不同,不同于SPI,文件内容是key=value的形式。这个key就可以保证只加载指定的实现。

扩展点自适应AdaptiveExtension(Protocol)

我们下面就通过一个例子来学习下ExtensionLoader的内部实现。Dubbo里面对于一些重要的模块包括注册中心,通信协议,路由以及其他很多模块都是通过ExtensionLoader来加载的,这样用户可以根据自身需要选择不同的实现或者扩展某个模块。Dubbo这种设计也符合JAVA设计原则-依赖倒转原则。下面我们以通信协议模块入手

Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

Dubbo里面的扩展点都是上面那样加载的,获取一个可以自适应的(根据指定配置加载需要的依赖,直到扩展点方法执行时才决定调用是一个扩展点实现)实例,先看下getExtensionLoader(Protocol.class)方法

public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
    if (type == null)
        throw new IllegalArgumentException("Extension type == null");
    if (!type.isInterface()) {
        throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
    }
    if (!withExtensionAnnotation(type)) {
        throw new IllegalArgumentException("Extension type(" + type +
                ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
    }

    ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    if (loader == null) {
        EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
        loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    }
    return loader;
}

这个方法比较简单,先进行一些基本的校验:必须是接口类型,接口里面必须出现SPI注解。然后从
EXTENSION_LOADERS(全局缓存一个ConcurrentMap<Class, ExtensionLoader>),
所有已经加载的ExtensionLoader都会缓存到这个map里面)里面获取ExtensionLoader实例,如果没有就new一个然后缓存到EXTENSION_LOADERS。下面看下是怎么new的ExtensionLoader

private ExtensionLoader(Class<?> type) {
    this.type = type;
    objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}

先判断type是不是ExtensionFactory类型,如果本身就是ExtensionFactory则objectFactory设置为null,如果不是ExtensionFactory类型,则通过ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension())实例化objectFactory类型。可以看到又和上面的Protocol一样是通过ExtensionLoader.getExtensionLoader(class).getAdaptiveExtension()来加载的。这里我们主要是分析Protocol的加载,ExtensionFactory和Protocol类似,所以我们直接跳过objectFactory的初始化。接着来看下上面得到了ExtensionLoader实例后,调用getAdaptiveExtension方法。

public T getAdaptiveExtension() {
    Object instance = cachedAdaptiveInstance.get();
    if (instance == null) {
        if (createAdaptiveInstanceError == null) {
            synchronized (cachedAdaptiveInstance) {
                instance = cachedAdaptiveInstance.get();
                if (instance == null) {
                    try {
                        instance = createAdaptiveExtension();
                        cachedAdaptiveInstance.set(instance);
                    } catch (Throwable t) {
                        createAdaptiveInstanceError = t;
                        throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
                    }
                }
            }
        } else {
            throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
        }
    }

    return (T) instance;
}

这个方法是获取一个自适应的扩展实例(关于扩展点的一些介绍可以参考这篇文档扩展点加载)
上述代码同样是先从缓存cachedAdaptiveInstance里面获取实例,如果没有就调用createAdaptiveExtension方法创建一个,然后缓存到cachedAdaptiveInstance。

private T createAdaptiveExtension() {
    try {
        return injectExtension((T) getAdaptiveExtensionClass().newInstance());
    } catch (Exception e) {
        throw new IllegalStateException("Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);
    }
}
    
private Class<?> getAdaptiveExtensionClass() {
    getExtensionClasses();
    if (cachedAdaptiveClass != null) {
        return cachedAdaptiveClass;
    }
    return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
    
private Map<String, Class<?>> getExtensionClasses() {
    Map<String, Class<?>> classes = cachedClasses.get();
    if (classes == null) {
        synchronized (cachedClasses) {
            classes = cachedClasses.get();
            if (classes == null) {
                classes = loadExtensionClasses();
                cachedClasses.set(classes);
            }
        }
    }
    return classes;
}

上述方法先getExtensionClasses(),看看有没有已经存在的扩展实现类,有的话取到这些类,把标记了@Adaptive的缓存到cachedAdaptiveClass(比如AdaptiveExtensionFactory就标记了@Adaptive),把没有标记@Adaptive的扩展实现类缓存到cachedClasses。我们先顺着getExtensionClasses==>loadExtensionClasses方法看下去。

private Map<String, Class<?>> loadExtensionClasses() {
    final SPI defaultAnnotation = type.getAnnotation(SPI.class);
    if (defaultAnnotation != null) {
        String value = defaultAnnotation.value();
        if ((value = value.trim()).length() > 0) {
            String[] names = NAME_SEPARATOR.split(value);
            if (names.length > 1) {
                throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
                        + ": " + Arrays.toString(names));
            }
            if (names.length == 1) cachedDefaultName = names[0];
        }
    }

    Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
    loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName());
    loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
    loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName());
    loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
    loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName());
    loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
    return extensionClasses;
}

可以看到与JDK SPI不同,Dubbo的SPI机制从META-INF/dubbo/internal/,META-INF/dubbo/,META-INF/services/三个路径读取扩展实现类,由于捐赠给apache基金会的原因一些报的路径修改了,所以代码通过org.apache repalace com.alibaba作了兼容。接着从loadDirectory==>loadResource==>loadClass方法

private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
    if (!type.isAssignableFrom(clazz)) {
        throw new IllegalStateException("Error when load extension class(interface: " +
                type + ", class line: " + clazz.getName() + "), class "
                + clazz.getName() + "is not subtype of interface.");
    }
    if (clazz.isAnnotationPresent(Adaptive.class)) {
        if (cachedAdaptiveClass == null) {
            cachedAdaptiveClass = clazz;
        } else if (!cachedAdaptiveClass.equals(clazz)) {
            throw new IllegalStateException("More than 1 adaptive class found: "
                    + cachedAdaptiveClass.getClass().getName()
                    + ", " + clazz.getClass().getName());
        }
    } else if (isWrapperClass(clazz)) {
        Set<Class<?>> wrappers = cachedWrapperClasses;
        if (wrappers == null) {
            cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
            wrappers = cachedWrapperClasses;
        }
        wrappers.add(clazz);
    } else {
        clazz.getConstructor();
        if (name == null || name.length() == 0) {
            name = findAnnotationName(clazz);
            if (name.length() == 0) {
                throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
            }
        }
        String[] names = NAME_SEPARATOR.split(name);
        if (names != null && names.length > 0) {
            Activate activate = clazz.getAnnotation(Activate.class);
            if (activate != null) {
                cachedActivates.put(names[0], activate);
            }
            for (String n : names) {
                if (!cachedNames.containsKey(clazz)) {
                    cachedNames.put(clazz, n);
                }
                Class<?> c = extensionClasses.get(n);
                if (c == null) {
                    extensionClasses.put(n, clazz);
                } else if (c != clazz) {
                    throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
                }
            }
        }
    }
}

上述方法逻辑是,判断type是不是扩展class的父类或者父接口,不是则抛出异常;判断扩展class里面是否出现过Adaptive注解,出现则缓存到cachedAdaptiveClass,并且只允许一个扩展class包含Adaptive注解,否则抛出异常;判断是否是WrapperClass,如果有拷贝构造函数,则判定为Wrapper扩展点 类并缓存到cachedWrapperClasses,Protocol有两个包装类。

  • filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
  • listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper

其他的既没有Adative注解,有没有拷贝构造方法,就是默认的Protocol扩展

  • org.apache.dubbo.registry.integration.RegistryProtocol
  • org.apache.dubbo.rpc.protocol.hessian.HessianProtocol
  • org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol
  • org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
  • org.apache.dubbo.rpc.support.MockProtocol

可以看到上述5个实现都没有出现过Adaptive注解,也没有拷贝构造函数,这5个会缓存到cachedClasses。所以回到getAdaptiveExtensionClass方法,如果扩展的实现没有@Adaptive标记的,则通过createAdaptiveExtensionClass方法来创建一个AdaptiveClass

private Class<?> createAdaptiveExtensionClass() {
    String code = createAdaptiveExtensionClassCode();
    ClassLoader classLoader = findClassLoader();
    org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
    return compiler.compile(code, classLoader);
}

上述代码逻辑是通过拼接代码生成一个AdaptiveClass的code,并默认通过javassist字节码技术动态编译一个AdaptiveClass。下图是上面的动态拼接的code字符串,可以看到生成了一个实现Protocol的Adaptive类,然后根据url里面传进来的参数决定调用对应的实现
code

AdaptiveClass创建之后,通过injectExtension注入扩展class依赖的相关类

private T injectExtension(T instance) {
    try {
        if (objectFactory != null) {
            for (Method method : instance.getClass().getMethods()) {
                if (method.getName().startsWith("set")
                        && method.getParameterTypes().length == 1
                        && Modifier.isPublic(method.getModifiers())) {
                    Class<?> pt = method.getParameterTypes()[0];
                    try {
                        String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
                        Object object = objectFactory.getExtension(pt, property);
                        if (object != null) {
                            method.invoke(instance, object);
                        }
                    } catch (Exception e) {
                        logger.error("fail to inject via method " + method.getName()
                                + " of interface " + type.getName() + ": " + e.getMessage(), e);
                    }
                }
            }
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
    return instance;
}

上述代码的逻辑是遍历AdaptiveClass的所有方法,看下有没有set方法注入,如果存在setter方法,则通过反射注入依赖的对象。因为通过字节码技术动态生成的Protocol AdaptiveClass没有通过set方法注入的依赖,所以该方法直接返回。

现在通过Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension()就获取了一个可以自适应的Protocol。

获取扩展点getExtension(Protocol)

public T getExtension(String name) {
    if (name == null || name.length() == 0)
        throw new IllegalArgumentException("Extension name == null");
    if ("true".equals(name)) {
        return getDefaultExtension();
    }
    Holder<Object> holder = cachedInstances.get(name);
    if (holder == null) {
        cachedInstances.putIfAbsent(name, new Holder<Object>());
        holder = cachedInstances.get(name);
    }
    Object instance = holder.get();
    if (instance == null) {
        synchronized (holder) {
            instance = holder.get();
            if (instance == null) {
                instance = createExtension(name);
                holder.set(instance);
            }
        }
    }
    return (T) instance;
}

上述方法逻辑也是先从缓存中获取指定name的实例,如果不存在就创建一个实例,然后缓存起来。

private T createExtension(String name) {
    Class<?> clazz = getExtensionClasses().get(name);
    if (clazz == null) {
        throw findException(name);
    }
    try {
        T instance = (T) EXTENSION_INSTANCES.get(clazz);
        if (instance == null) {
            EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
            instance = (T) EXTENSION_INSTANCES.get(clazz);
        }
        injectExtension(instance);
        Set<Class<?>> wrapperClasses = cachedWrapperClasses;
        if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
            for (Class<?> wrapperClass : wrapperClasses) {
                instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
            }
        }
        return instance;
    } catch (Throwable t) {
        throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
                type + ")  could not be instantiated: " + t.getMessage(), t);
    }
}

上述代码会先从之前缓存的cachedClasses获取指定name(“dubbo”)的Protocol实现org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol,接着调用之前分析过的injectExtension方法注入DubboProtocol的依赖(没有setter方法,不需要注入)。然后获取缓存中的cachedWrapperClasses,通过包装类包装DubboProtocol,最终返回一个new ProtocolListenerWrapper(new ProtocolFilterWrapper(DubboProtocol))的包装类。


文章作者: 叶明
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 叶明 !
评论
 上一篇
Dubbo源码学习二(服务注册) Dubbo源码学习二(服务注册)
我们知道Dubbo逻辑上由以下几个模块组成(不考虑Monitor和Container) Provider:服务提供方 Consumer:服务消费方 Registry:服务注册与发现的注册中心 稍微解释下Provider和Consum
2018-07-28
下一篇 
数据库访问层中间件Zebra 数据库访问层中间件Zebra
zebra是一个基于JDBC API协议上开发出的高可用、高性能的数据库访问层解决方案。类似阿里的tddl,zebra是一个smart客户端,提供了诸如动态配置、监控、读写分离、分库分表等功能。下图是zebra的整体架构图 zebra整体架
2017-11-29
  目录