DubboSPI贯穿在整个Dubbo的代码中,所以很有必要详细了解SPI的原理。
SpringFactoriesLoader
org.springframework.core.io.support.SpringFactoriesLoader#loadSpringFactories
public static final String FACTORIES_RESOURCE_LOCATION = "META-INF/spring.factories"; private static Map<String, List<String>> loadSpringFactories(@Nullable ClassLoader classLoader) { MultiValueMap<String, String> result = cache.get(classLoader); if (result != null) { return result; } try { Enumeration<URL> urls = (classLoader != null ? classLoader.getResources(FACTORIES_RESOURCE_LOCATION) : ClassLoader.getSystemResources(FACTORIES_RESOURCE_LOCATION)); result = new LinkedMultiValueMap<>(); while (urls.hasMoreElements()) { URL url = urls.nextElement(); UrlResource resource = new UrlResource(url); Properties properties = PropertiesLoaderUtils.loadProperties(resource); for (Map.Entry<?, ?> entry : properties.entrySet()) { String factoryTypeName = ((String) entry.getKey()).trim(); for (String factoryImplementationName : StringUtils.commaDelimitedListToStringArray((String) entry.getValue())) { result.add(factoryTypeName, factoryImplementationName.trim()); } } } cache.put(classLoader, result); return result; } catch (IOException ex) { throw new IllegalArgumentException("Unable to load factories from location [" + FACTORIES_RESOURCE_LOCATION + "]", ex); } }上述代码就是从META-INF/spring.factories中获取我们需要的配置信息,spring.factories的内容大致长这样
# PropertySource Loaders org.springframework.boot.env.PropertySourceLoader=\ org.springframework.boot.env.PropertiesPropertySourceLoader,\ org.springframework.boot.env.YamlPropertySourceLoader # Run Listeners org.springframework.boot.SpringApplicationRunListener=\ org.springframework.boot.context.event.EventPublishingRunListener # Error Reporters org.springframework.boot.SpringBootExceptionReporter=\ org.springframework.boot.diagnostics.FailureAnalyzers但是他比JAVA SPI更好的点在于不会一次加载所有的类,而是根据Key进行加载
到之后会按照我们从配置文件中获取的结果,来装配对应的bean,本文不赘述spring的spi原理
了解 Dubbo 里面的 SPI 机制之前,我们先了解下 Java 提供的 SPI(service provider interface)机制,SPI 是 JDK 内置的一种服务提供发现机制。
目前市面上有很多框架都是用它来做服务的扩展发现。
简单来说,它是一种动态替换发现的机制。
举个简单 的例子,我们想在运行时动态给它添加实现,你只需要添加一个实现,然后把新的实现描述给 JDK 知道就行了。大家耳熟能详的如 JDBC、日志框架都有用到。
SPI很多地方都在使用,最常用的就是JDBC驱动
JDK 本身提供了数据访问的 api。
在 java.sql 这个包里面
我们在连接数据库的时候,一定需要用到 java.sql.Driver 如下
java.sql.DriverManager#getConnection(String, Properties, Class<?>)
Connection con = aDriver.driver.connect(url, info); 因为我们在实际应用中用的比较多的是 mysql,所以我们去 mysql 的包里面看到一个如下的目录结构
这个文件里面写的就是 mysql 的驱动实现。文件内容为com.mysql.cj.jdbc.Driver
通过 SPI 机制把java.sql.Driver 和 mysql 的驱动做了集成。
这样就达到了各个数据库厂商自己去实现数据库连接,jdk 本身不关心你怎么实现。
那JDBC是如何加载的呢
在这个方法中通过
private static void loadInitialDrivers() { String drivers; try { drivers = AccessController.doPrivileged(new PrivilegedAction<String>() { public String run() { return System.getProperty("jdbc.drivers"); } }); } catch (Exception ex) { drivers = null; } AccessController.doPrivileged(new PrivilegedAction<Void>() { public Void run() { ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(Driver.class); Iterator<Driver> driversIterator = loadedDrivers.iterator(); try{ while(driversIterator.hasNext()) { driversIterator.next(); } } catch(Throwable t) { // Do nothing } return null; } }); println("DriverManager.initialize: jdbc.drivers = " + drivers); if (drivers == null || drivers.equals("")) { return; } String[] driversList = drivers.split(":"); println("number of Drivers:" + driversList.length); for (String aDriver : driversList) { try { println("DriverManager.Initialize: loading " + aDriver); Class.forName(aDriver, true, ClassLoader.getSystemClassLoader()); } catch (Exception ex) { println("DriverManager.Initialize: load failed: " + ex); } } }此方法来获取jar包中获取到的driver从而加载到类加载器中等待使用
参考文档地址
org.apache.dubbo.common.extension.ExtensionLoader#loadExtensionClasses
Dubbo 的 SPI 扩展机制,有两个规则
需要在 resource 目录以下其中一个目录中创建一个文件 META-INF/dubboMETA-INF/dubbo/internalMETA-INF/services 文件名称和接口名称保持一致,文件内容和 SPI 有差异,内容是 KEY 对应 ValueDubbo 针对的扩展点非常多
协议、调用拦截、引用监听、暴露扩展等几乎所有的功能都能应用自己的扩展
在目录resources/META-INF/dubbo下创建文件org.apache.dubbo.rpc.Protocol
文件内容为
myProtocol=com.zzjson.dubbo.consumer.MyProtocol public class MyProtocol implements Protocol { @Override public int getDefaultPort() { return 9527; } 。。。 }发现成功的获取到了自己定义的扩展点
总的来说,思路和 SPI 是差不多。都是基于约定的路径下制定配置文件。通过配置的方式轻松实现功能的扩展。
我们的猜想是,一定有一个地方通过读取指定路径下的所有文件进行 load。然后讲对应的结果保存到一个 map 中,key 对应为 名称,value 对应为实现类。那么这个实现,一定就在 ExtensionLoader 中了。
接下来我们就可以基于这个猜想去看看代码的实现。
大家要思考一个问题,所谓的扩展点,就是通过指定目录下配置一个对应接口的实现类,然后程序会进行查找和解析,找到对应的扩展点。那么这里就涉及到两个问题
怎么解析被加载的类如何存储和使用SPI的类都使用了注解@SPI
@SPI("dubbo") public interface Protocol{}对Extension做了缓存,存储在了ConcurrentMap中
如果当前的 type=ExtensionFactory,那么 objectFactory=null,
否则会创建一个自适应扩展点给到 objectFacotry,自适应扩展点是什么我们会在下文解释
这个方法就是根据一个名字来获得一个对应类的实例,所以我们来猜想一下,回想一下前面咱们配置的自定义协议,name 实际 上就是 myprotocol,而返回的实现类应该就是 MyProtocol。
defaultExtension就是我们在@SPI注解中默认的值
@SPI("spring") public interface Container {}这一块都是判断参数是否正确,继续查找重载空构造方法
这个方法,会查找指定目录
/META-INF/dubboMETA-INF/dubbo/META-INF/dubbo/internal/ 中对应的 type上文中的 Protocol 的 properties 文件,然后扫描这个文件下的所有配置信息。
然后保存到一个 HashMap中
key=对应 protocol 文件中配置的 myprotocolvalue=对应配置的类的实例 这个方法是用来实现依赖注入的,如果被加载的实例中,有成员属性本身也是一个扩展点,并且没有被标识DisableInject
则会通过 set 方法进行注入。
依赖注入也是dubbo一个更加强大的点
自适应扩展点
有些拓展并不想在框架启动阶段被加载,而是希望在拓展方法被调用时,根据运行时参数进行加载。这听起来有些矛盾。拓展未被加载,那么拓展方法就无法被调用(静态方法除外)。拓展方法未被调用,拓展就无法被加载。对于这个矛盾的问题,Dubbo 通过自适应拓展机制很好的解决了。
什么叫自适应扩展点呢?
我们先演示一个例子,在下面这个例子中,我们传入一个 Compiler 接口,它会返回一个 AdaptiveCompiler。
这个就叫自适应。
Compiler compiler=ExtensionLoader.getExtensionLoader(Compiler.class).getAdaptiveExtension(); System.out.println(compiler.getClass()); 它是怎么实现的呢?
我们根据返回的 AdaptiveCompiler 这个类,看到这个类上面有一个注解@Adaptive。 这个就是一个自适应 扩展点的标识。
@Adaptive public class AdaptiveCompiler implements Compiler {} 它可以修饰在类上,也可以修饰在方法上面。这两者有什么区别呢?
简单来说,放在类上,说明当前类是一个确定的自适应扩展点的类。如果是放在方法级别,那么需要生成一个动态字节码,来进行转发。
比如拿 Protocol 这个接口来说,它里面定义了export和refer 两个抽象方法,这两个方法分别带有@Adaptive的标识,标识是 一个自适应方法。
@SPI("dubbo") public interface Protocol { @Adaptive <T> Exporter<T> export(Invoker<T> invoker) throws RpcException; @Adaptive <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException; } 我们知道 Protocol 是一个通信协议的接口,具体有多种实现,那么这个时候选择哪一种呢?
取决于我们在使用 dubbo 的时候所配置的协议名称。
而这里的方法层面的 Adaptive 就决定了当前这个方法会采用何种协议来发布服务。
private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();这个方法中做两个事情
获得一个自适应扩展点实例实现依赖注入 getExtensionClasses()这个方法在上文中已经讲过了,会加载当前传入的类型的所有扩展点,保存在一个 hashmap 中
cachedAdaptiveClass, Adaptive 可以放在两个位置,一个是类级别,一个是方法级别。
那么这个 cachedAdaptiveClass很显然,就是放在类级别的 Adaptive,
org.apache.dubbo.common.extension.ExtensionLoader#loadClass
if (clazz.isAnnotationPresent(Adaptive.class)) { cacheAdaptiveClass(clazz); } private void cacheAdaptiveClass(Class<?> clazz) { if (cachedAdaptiveClass == null) { cachedAdaptiveClass = clazz; } else if (!cachedAdaptiveClass.equals(clazz)) { throw new IllegalStateException("More than 1 adaptive class found: " + cachedAdaptiveClass.getName() + ", " + clazz.getName()); } } 在加载完之后如果这个类有@Adaptive 标识,则会赋值给 cachedAdaptiveClass如果 cachedAdaptiveClass 不存在,dubbo 会动态生成一个代理类 Protocol$Adaptive. 前面的名字 protocol 是根据当前 ExtensionLoader 所加载的扩展点来定义的
动态生成字节码,然后进行动态加载。那么这个时候返回的 class,如果加载的是 Protocol.class, 应该是 Protocol$Adaptive 这个 cachedDefaultName 实际上就是扩展点接口的@SPI 注解对应的名字,如果此时加载的是 Protocol.class,那么 cachedDefaultName=dubbo
通过debug获取动态生成的代理类
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol { public void destroy() { throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!"); } public int getDefaultPort() { throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!"); } public java.util.List getServers() { throw new UnsupportedOperationException("The method public default java.util.List org.apache.dubbo.rpc.Protocol.getServers() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!"); } public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException { if (arg0 == null) { throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null"); } if (arg0.getUrl() == null) { throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null"); } org.apache.dubbo.common.URL url = arg0.getUrl(); String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) { throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])"); } org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName); return extension.export(arg0); } public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException { if (arg1 == null) { throw new IllegalArgumentException("url == null"); } org.apache.dubbo.common.URL url = arg1; String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) { throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])"); } org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName); return extension.refer(arg0, arg1); } } 在上述实例化ExtensionLoader的地方我们发现有一个实例化ExtensionFactory的地方
private ExtensionLoader(Class<?> type) { this.type = type; objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()); } 然后通过 ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()去获得一个自适应的扩展点,进 入 ExtensionFactory 这个接口中,可以看到它是一个扩展点,并且有一个自己实现的自适应扩展点 AdaptiveExtensionFactory;
@Adaptive public class AdaptiveExtensionFactory implements ExtensionFactory { private final List<ExtensionFactory> factories; public AdaptiveExtensionFactory() { ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class); List<ExtensionFactory> list = new ArrayList<ExtensionFactory>(); for (String name : loader.getSupportedExtensions()) { list.add(loader.getExtension(name)); } factories = Collections.unmodifiableList(list); } @Override public <T> T getExtension(Class<T> type, String name) { for (ExtensionFactory factory : factories) { T extension = factory.getExtension(type, name); if (extension != null) { return extension; } } return null; } }他会从配置文件中读取对应的extensionfactory找到一个就返回了
自动激活扩展点,有点类似 springboot 的时候用到的 conditional,根据条件进行自动激活。
但是这里设计的初衷是,对 于一个类会加载多个扩展点的实现,这个时候可以通过自动激活扩展点进行动态加载, 从而简化配置我们的配置工作 @Activate 提供了一些配置来允许我们配置加载条件,比如 group 过滤,比如 key 过滤。
举个例子,我们可以看看org.apache.dubbo.Filter这个类,它有非常多的实现,比如说 CacheFilter,这个缓存过滤器,配置信息 如下
group 表示客户端和和服务端都会加载,value 表示 url 中有 cache_key 的时候
@Activate(group = {CONSUMER, PROVIDER}, value = CACHE_KEY) public class CacheFilter implements Filter {}从代码中可以看出来,其会对标记了Activate注解的类进行判断和排序满足条件的才能被使用
public class AdaptiveBootStrap { public static void main(String[] args) { ExtensionLoader<Filter> loader = ExtensionLoader.getExtensionLoader( Filter.class); URL url = new URL("", "", 0); url = url.addParameter("cache", "cache"); //有和没有的区别 List<Filter> filters = loader.getActivateExtension(url, "cache"); System.out.println(filters.size()); } }当前文档中当我们的url中有满足cacheFilter中的key的时候则会把对应的filter加载出来