之前其实已经粗略看过Dubbo的源码,最近一个月在公司连续写了nodejs和go语言的redis cluster客户端之后,稍微有点空余时间就准备找个开源的java项目学习下。正好阿里准备对Dubbo重启开发3.0,就决定重新学习下Dubbo。
Dubbo是一款高性能、轻量级的开源Java RPC框架,它提供了三大核心能力:面向接口的远程方法调用,智能容错和负载均衡,以及服务自动注册和发现。
个人认为Dubbo之所以能让这么多人和公司关注,它的一些设计模式确实很值得学习。我们都知道java有七大设计原则,说下开闭原则OCP,对扩展开放,对修改关闭。Dubbo被那么多人和公司使用,可能每个公司都会根据自身的需求对Dubbo做些修改。Dubbo一开始就留了一些扩展点,对于一些常用的模块像注册中心,通信协议,路由等模块都提供了多种不同的实现。如果还不能满足需求的话,用户还可以自行定制。让参与者尽量黑盒扩展,而不是白盒去修改代码。
Dubbo即然要扩展,扩展点的加载方式,首先要统一,微核心+插件式,是比较能达到 OCP 原则的思路。由一个插件生命周期管理容器,构成微核心,核心不包括任何功能,这样可以确保所有功能都能被替换,并且,框架作者能做到的功能,扩展者也一定要能做到,以保证平等对待第三方,所以,框架自身的功能也要用插件的方式实现,不能有任何硬编码。
Dubbo参考了JDK标准的SPI机制,参见:java.util.ServiceLoader,开发了一套ExtensionLoader加载机制,所有需要扩展的模块都由ExtensionLoader来动态加载。
SPI(Service Provider Interface)
SPI简介
SPI的全名为Service Provider Interface。大多数开发人员可能不熟悉,因为这个是针对厂商或者插件的。例如标准的jdk JDBC,jdk本身没有实现通信协议,只是定义了一套操作数据库的接口。由不同的数据库生产厂商自己实现JDBC一套标准的接口。在面向对象的设计中,我们一般都是面对接口编程,一个接口可能有多种不同的实现,我们一般不在代码里面硬编码指定具体使用的接口实现。比如我一开始使用mysql,后来使用oracle,那么我就需要修改代码加载oracle的jdbc实现,这就违反了OCP原则。所以我们必须实现一种机制,让程序能自己寻找服务实现的机制,有点类似IOC,将对象的装配的控制权转移到程序之外。
SPI约定
当服务的提供者实现了某个接口之后,必须在META-INF/services路径下面创建一个文件名是接口全类名,文件内容是具体实现的全类名。ServiceLoader就会去扫描这个路径下面所有的文件,会加载这个实现类。
SPI demo
public static void main(String[] args) throws SQLException, ClassNotFoundException {
//java5以上版本,不需要Class.forName("com.mysql.jdbc.Driver");
//Class.forName("com.mysql.jdbc.Driver");
String connectionURL = "jdbc:mysql://localhost:3306/test";
Connection conn = DriverManager.getConnection(connectionURL, "userName", "password");
}
上述是使用jdbc连接mysql的一个demo,其实从java5版本以后,已经不需要使用Class.forName(“com.mysql.jdbc.Driver”)加载mysql驱动了。我们只要确保classpath引入了mysql-connector-java包,后面的DriverManager.getConnection会根据mysql-connector-java包里面的META-INF/services文件自动加载com.mysql.jdbc.Driver。
Dubbo ExtensionLoader
来源
Dubbo 的扩展点加载从 JDK 标准的 SPI (Service Provider Interface) 扩展点发现机制加强而来。
Dubbo 改进了 JDK 标准的 SPI 的以下问题:
- JDK 标准的 SPI 会一次性实例化扩展点所有实现,如果有扩展实现初始化很耗时,但如果没用上也加载,会很浪费资源。
- 如果扩展点加载失败,连扩展点的名称都拿不到了。比如:JDK 标准的 ScriptEngine,通过 getName() 获取脚本类型的名称,但如果 RubyScriptEngine 因为所依赖的 jruby.jar 不存在,导致 RubyScriptEngine 类加载失败,这个失败原因被吃掉了,和 ruby 对应不起来,当用户执行 ruby 脚本时,会报不支持 ruby,而不是真正失败的原因。
- 增加了对扩展点 IoC 和 AOP 的支持,一个扩展点可以直接 setter 注入其它扩展点。
上图可以看到dubbo默认约定文件放在META-INF/dubbo/internal下面,文件名还是接口全类名,文件内容稍有不同,不同于SPI,文件内容是key=value的形式。这个key就可以保证只加载指定的实现。
扩展点自适应AdaptiveExtension(Protocol)
我们下面就通过一个例子来学习下ExtensionLoader的内部实现。Dubbo里面对于一些重要的模块包括注册中心,通信协议,路由以及其他很多模块都是通过ExtensionLoader来加载的,这样用户可以根据自身需要选择不同的实现或者扩展某个模块。Dubbo这种设计也符合JAVA设计原则-依赖倒转原则。下面我们以通信协议模块入手
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
Dubbo里面的扩展点都是上面那样加载的,获取一个可以自适应的(根据指定配置加载需要的依赖,直到扩展点方法执行时才决定调用是一个扩展点实现)实例,先看下getExtensionLoader(Protocol.class)方法
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null)
throw new IllegalArgumentException("Extension type == null");
if (!type.isInterface()) {
throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
}
if (!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type(" + type +
") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
}
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}
这个方法比较简单,先进行一些基本的校验:必须是接口类型,接口里面必须出现SPI注解。然后从
EXTENSION_LOADERS(全局缓存一个ConcurrentMap<Class>, ExtensionLoader>>),
所有已经加载的ExtensionLoader都会缓存到这个map里面)里面获取ExtensionLoader实例,如果没有就new一个然后缓存到EXTENSION_LOADERS。下面看下是怎么new的ExtensionLoader
private ExtensionLoader(Class<?> type) {
this.type = type;
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}
先判断type是不是ExtensionFactory类型,如果本身就是ExtensionFactory则objectFactory设置为null,如果不是ExtensionFactory类型,则通过ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension())实例化objectFactory类型。可以看到又和上面的Protocol一样是通过ExtensionLoader.getExtensionLoader(class).getAdaptiveExtension()来加载的。这里我们主要是分析Protocol的加载,ExtensionFactory和Protocol类似,所以我们直接跳过objectFactory的初始化。接着来看下上面得到了ExtensionLoader
public T getAdaptiveExtension() {
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError == null) {
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
}
}
}
} else {
throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
}
}
return (T) instance;
}
这个方法是获取一个自适应的扩展实例(关于扩展点的一些介绍可以参考这篇文档扩展点加载)
上述代码同样是先从缓存cachedAdaptiveInstance里面获取实例,如果没有就调用createAdaptiveExtension方法创建一个,然后缓存到cachedAdaptiveInstance。
private T createAdaptiveExtension() {
try {
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
private Map<String, Class<?>> getExtensionClasses() {
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
classes = loadExtensionClasses();
cachedClasses.set(classes);
}
}
}
return classes;
}
上述方法先getExtensionClasses(),看看有没有已经存在的扩展实现类,有的话取到这些类,把标记了@Adaptive的缓存到cachedAdaptiveClass(比如AdaptiveExtensionFactory就标记了@Adaptive),把没有标记@Adaptive的扩展实现类缓存到cachedClasses。我们先顺着getExtensionClasses==>loadExtensionClasses方法看下去。
private Map<String, Class<?>> loadExtensionClasses() {
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if (defaultAnnotation != null) {
String value = defaultAnnotation.value();
if ((value = value.trim()).length() > 0) {
String[] names = NAME_SEPARATOR.split(value);
if (names.length > 1) {
throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
+ ": " + Arrays.toString(names));
}
if (names.length == 1) cachedDefaultName = names[0];
}
}
Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName());
loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName());
loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName());
loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
return extensionClasses;
}
可以看到与JDK SPI不同,Dubbo的SPI机制从META-INF/dubbo/internal/,META-INF/dubbo/,META-INF/services/三个路径读取扩展实现类,由于捐赠给apache基金会的原因一些报的路径修改了,所以代码通过org.apache repalace com.alibaba作了兼容。接着从loadDirectory==>loadResource==>loadClass方法
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Error when load extension class(interface: " +
type + ", class line: " + clazz.getName() + "), class "
+ clazz.getName() + "is not subtype of interface.");
}
if (clazz.isAnnotationPresent(Adaptive.class)) {
if (cachedAdaptiveClass == null) {
cachedAdaptiveClass = clazz;
} else if (!cachedAdaptiveClass.equals(clazz)) {
throw new IllegalStateException("More than 1 adaptive class found: "
+ cachedAdaptiveClass.getClass().getName()
+ ", " + clazz.getClass().getName());
}
} else if (isWrapperClass(clazz)) {
Set<Class<?>> wrappers = cachedWrapperClasses;
if (wrappers == null) {
cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
wrappers = cachedWrapperClasses;
}
wrappers.add(clazz);
} else {
clazz.getConstructor();
if (name == null || name.length() == 0) {
name = findAnnotationName(clazz);
if (name.length() == 0) {
throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
}
}
String[] names = NAME_SEPARATOR.split(name);
if (names != null && names.length > 0) {
Activate activate = clazz.getAnnotation(Activate.class);
if (activate != null) {
cachedActivates.put(names[0], activate);
}
for (String n : names) {
if (!cachedNames.containsKey(clazz)) {
cachedNames.put(clazz, n);
}
Class<?> c = extensionClasses.get(n);
if (c == null) {
extensionClasses.put(n, clazz);
} else if (c != clazz) {
throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
}
}
}
}
}
上述方法逻辑是,判断type是不是扩展class的父类或者父接口,不是则抛出异常;判断扩展class里面是否出现过Adaptive注解,出现则缓存到cachedAdaptiveClass,并且只允许一个扩展class包含Adaptive注解,否则抛出异常;判断是否是WrapperClass,如果有拷贝构造函数,则判定为Wrapper扩展点 类并缓存到cachedWrapperClasses,Protocol有两个包装类。
- filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
- listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
其他的既没有Adative注解,有没有拷贝构造方法,就是默认的Protocol扩展
- org.apache.dubbo.registry.integration.RegistryProtocol
- org.apache.dubbo.rpc.protocol.hessian.HessianProtocol
- org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol
- org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
- org.apache.dubbo.rpc.support.MockProtocol
可以看到上述5个实现都没有出现过Adaptive注解,也没有拷贝构造函数,这5个会缓存到cachedClasses。所以回到getAdaptiveExtensionClass方法,如果扩展的实现没有@Adaptive标记的,则通过createAdaptiveExtensionClass方法来创建一个AdaptiveClass
private Class<?> createAdaptiveExtensionClass() {
String code = createAdaptiveExtensionClassCode();
ClassLoader classLoader = findClassLoader();
org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}
上述代码逻辑是通过拼接代码生成一个AdaptiveClass的code,并默认通过javassist字节码技术动态编译一个AdaptiveClass。下图是上面的动态拼接的code字符串,可以看到生成了一个实现Protocol的Adaptive类,然后根据url里面传进来的参数决定调用对应的实现
AdaptiveClass创建之后,通过injectExtension注入扩展class依赖的相关类
private T injectExtension(T instance) {
try {
if (objectFactory != null) {
for (Method method : instance.getClass().getMethods()) {
if (method.getName().startsWith("set")
&& method.getParameterTypes().length == 1
&& Modifier.isPublic(method.getModifiers())) {
Class<?> pt = method.getParameterTypes()[0];
try {
String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("fail to inject via method " + method.getName()
+ " of interface " + type.getName() + ": " + e.getMessage(), e);
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}
上述代码的逻辑是遍历AdaptiveClass的所有方法,看下有没有set方法注入,如果存在setter方法,则通过反射注入依赖的对象。因为通过字节码技术动态生成的Protocol AdaptiveClass没有通过set方法注入的依赖,所以该方法直接返回。
现在通过Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension()就获取了一个可以自适应的Protocol。
获取扩展点getExtension(Protocol)
public T getExtension(String name) {
if (name == null || name.length() == 0)
throw new IllegalArgumentException("Extension name == null");
if ("true".equals(name)) {
return getDefaultExtension();
}
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<Object>());
holder = cachedInstances.get(name);
}
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}
上述方法逻辑也是先从缓存中获取指定name的实例,如果不存在就创建一个实例,然后缓存起来。
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
type + ") could not be instantiated: " + t.getMessage(), t);
}
}
上述代码会先从之前缓存的cachedClasses获取指定name(“dubbo”)的Protocol实现org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol,接着调用之前分析过的injectExtension方法注入DubboProtocol的依赖(没有setter方法,不需要注入)。然后获取缓存中的cachedWrapperClasses,通过包装类包装DubboProtocol,最终返回一个new ProtocolListenerWrapper(new ProtocolFilterWrapper(DubboProtocol))的包装类。