Dubbo源码分析

James 2019年11月21日 295次浏览

Dubbo SPI

Dubbo SPI使用以及规范

  • 创建接口并且加上@SPI注解表示该接口是一个Dubbo扩展点,将该扩展点打成一个jar包发布
@SPI
public interface IHelloService {
    String sayHello(String msg);
}
  • 在需要实现的扩展插件项目中依赖以上接口扩展点,并且实现该接口扩展点
public class HelloServiceImpl implements IHelloService {
    @Override
    public String sayHello(String msg) {
        return "Dubbo SPI Hello:" + msg;
    }
}

并且在resources目录先创建META-INF/dubbo/(META-INF/dubbo/;META-INF/dubbo/internal/;META-INF/services/;任选一个)目录,并且在该目录下创建以扩展点接口为名称的文件:xyz.easyjava.dubbo.spi.extend.service.IHelloService
在该文件中填写该扩展点实现的名称以及实现类的全路径,例如:

hello=xyz.easyjava.dubbo.spi.achieve.service.HelloServiceImpl
hello2=xyz.easyjava.dubbo.spi.achieve.service.HelloServiceImpl2
  • 现在就可以在需要使用该扩展的地方使用了,方式如下:
public class DubboSpiTest {
    public static void main(String[] args) {
        IHelloService extension = ExtensionLoader.getExtensionLoader(IHelloService.class).getExtension("hello2");
        System.out.println(extension.sayHello("MrAToo"));
        IHelloService adaptiveExtension = ExtensionLoader.getExtensionLoader(IHelloService.class).getAdaptiveExtension();
        System.out.println(adaptiveExtension.sayHello("MrAToo2"));
    }
}

源码分析

先从ExtensionLoader.getExtensionLoader(IHelloService.class).getAdaptiveExtension();开始

  1. 首先是调用了ExtensionLoader类的静态方法getExtensionLoader(IHelloService.class),在该方法中除了校验,主要是实例化了一个ExtensionLoader实例,
    并且在ExtensionLoader的构造方法中通过objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
    创建了一个objectFactory对象,该对象是一个ExtensionFactory
  2. 得到ExtensionLoader实例对象过后,调用了该对象的getAdaptiveExtension()方法,在该方法中调用createAdaptiveExtension()创建实例,在createAdaptiveExtension()里面调用
    injectExtension((T) getAdaptiveExtensionClass().newInstance());,该方法是一个注入的方法,先不看injectExtension()方法是如何注入的,我们先看实例是如何创建的,很显然,实例作为injectExtension()
    方法的参数传入,那么getAdaptiveExtensionClass().newInstance()这句代码中的getAdaptiveExtensionClass()方法返回的Class中T是如何确定的。在getAdaptiveExtensionClass()方法中,首先调用了getExtensionClasses();
    然后判断cachedAdaptiveClass是否为null,如果不为null,则直接返回cachedAdaptiveClass,那么再看看getExtensionClasses();方法中又干了什么,在该方法中又调用了loadExtensionClasses(),下面来看看该方法的代码:
private Map<String, Class<?>> loadExtensionClasses() {
    final SPI defaultAnnotation = type.getAnnotation(SPI.class);
    if (defaultAnnotation != null) {
        String value = defaultAnnotation.value();
        if ((value = value.trim()).length() > 0) {
            String[] names = NAME_SEPARATOR.split(value);
            if (names.length > 1) {
                throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
                        + ": " + Arrays.toString(names));
            }
            if (names.length == 1) {
                cachedDefaultName = names[0];
            }
        }
    }

    Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
    loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName());
    loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
    loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName());
    loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
    loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName());
    loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
    return extensionClasses;
}

首先是判断type的类对象是否包含@SPI注解,如果包含该注解,则将该注解的value值放到cachedDefaultName属性中(该属性在createAdaptiveExtensionClassCode方法中使用到,可以通过getDefaultExtensionName方法获取默认扩展点,如果自适应扩展点中URL协议为空该值可以作为默认协议),
最后调用loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName());分别加载META-INF/dubbo/;META-INF/dubbo/internal/;META-INF/services/;三个目录下的文件,最终会调用一下方法

/**
 * extensionClasses: 扩展类集合
 * resourceURL: 资源URL
 * clazz: 扩展类Class(实现了扩展接口的类,配置在接口文件中的Class)
 * name: 名称
 */
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
    if (!type.isAssignableFrom(clazz)) {
        throw new IllegalStateException("Error when load extension class(interface: " +
                type + ", class line: " + clazz.getName() + "), class "
                + clazz.getName() + "is not subtype of interface.");
    }
    if (clazz.isAnnotationPresent(Adaptive.class)) {
        if (cachedAdaptiveClass == null) {
            cachedAdaptiveClass = clazz;
        } else if (!cachedAdaptiveClass.equals(clazz)) {
            throw new IllegalStateException("More than 1 adaptive class found: "
                    + cachedAdaptiveClass.getClass().getName()
                    + ", " + clazz.getClass().getName());
        }
    } else if (isWrapperClass(clazz)) {
        Set<Class<?>> wrappers = cachedWrapperClasses;
        if (wrappers == null) {
            cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
            wrappers = cachedWrapperClasses;
        }
        wrappers.add(clazz);
    } else {
        clazz.getConstructor();
        if (name == null || name.length() == 0) {
            name = findAnnotationName(clazz);
            if (name.length() == 0) {
                throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
            }
        }
        String[] names = NAME_SEPARATOR.split(name);
        if (names != null && names.length > 0) {
            Activate activate = clazz.getAnnotation(Activate.class);
            if (activate != null) {
                cachedActivates.put(names[0], activate);
            } else {
                // support com.alibaba.dubbo.common.extension.Activate
                com.alibaba.dubbo.common.extension.Activate oldActivate = clazz.getAnnotation(com.alibaba.dubbo.common.extension.Activate.class);
                if (oldActivate != null) {
                    cachedActivates.put(names[0], oldActivate);
                }
            }
            for (String n : names) {
                if (!cachedNames.containsKey(clazz)) {
                    cachedNames.put(clazz, n);
                }
                Class<?> c = extensionClasses.get(n);
                if (c == null) {
                    extensionClasses.put(n, clazz);
                } else if (c != clazz) {
                    throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
                }
            }
        }
    }
}
  1. 先看第一个判断clazz.isAnnotationPresent(Adaptive.class),如果扩展类包含@Adaptive注解,则将该扩展作为自定义适配扩展点,赋值给cachedAdaptiveClass,前面提到在getExtensionClasses方法中,如果cachedAdaptiveClass值不为null,则直接返回,
    所以,当实现接口的类上有@Adaptive注解,则getAdaptiveExtension();返回的实例就是当前实例,即自定义适配扩展点。
  2. 再看第二个判断isWrapperClass(clazz),判断当前这个扩展是不是一个wrapper,如果是,则将该扩展类放入到cachedWrapperClasses中,该变量在getExtension方法调用链createExtension方法中被使用,大概源码内容为,如果cachedWrapperClasses变量有值,则需要包装原始对象。
  3. 如果以上两个条件都不成立,则走else逻辑,在该逻辑中,首先判断当前扩展类中是否包含@Activate注解,如果包含,则put到cachedActivates中。
    在判断cachedNames中是否包含当前扩展类的类对象,如果不存在,则将class放到cachedNames里面,最后循环将name作为key,class作为value放到extensionClasses中。

我们在回过头来看,在getAdaptiveExtensionClass方法中,如果cachedAdaptiveClass为null,则会调用createAdaptiveExtensionClass方法,并且将该方法的返回值放入到cachedAdaptiveClass中,然后返回。
createAdaptiveExtensionClass方法中,通过调用createAdaptiveExtensionClassCode方法返回一串代码,然后动态编译生成Class对象,然后返回。
那么在createAdaptiveExtensionClassCode(该方法主要是生成一个代理类)中,这串代码到底是什么,又是如何生成的呢?
首先,dubbo只会为该接口中带有@Adaptive注解的方法进行代理,如果该接口中没有带@Adaptive注解的方法,则会抛出异常,并且,Dubbo是一个基于URL驱动的RPC框架,方法中标注有@Adaptive注解的方法参数上必须
带有java.net.URL参数,否则,会抛出异常。
生成的代理类代码如下:

package xyz.easyjava.dubbo.spi.achieve.service;

import org.apache.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
    public void destroy() {throw new UnsupportedOperationException("method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
    }
    public int getDefaultPort() {throw new UnsupportedOperationException("method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
    }
    public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");org.apache.dubbo.common.URL url = arg0.getUrl();
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }
    public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        org.apache.dubbo.common.URL url = arg1;
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }
}

从上面生成的类来看,dubbo对扩展接口上带有@Adaptive注解的方法进行了代理,没有标注@Adaptive的方法,直接抛出UnsupportedOperationException异常。被代理的方法通过URL协议来获取一个扩展点。

接下来,分析injectExtension()方法,也就是Dubbo中的依赖注入,Duboo支持Spring的依赖注入以及Dubbo自己的SPI自适应扩展点。
在该方法中可以看出,当前自适应扩展点中是否包含一个setter方法,有且只有一个参数的public的方法,并且该方法没有标注@DisableInject注解,那么Dubbo会为该自适应扩展点依赖注入。
被注入的对象核心代码在objectFactory.getExtension(pt, property)通过objectFactorygetExtension方法获得被注入的对象,然后放到当前自适应扩展点,实现依赖注入。
那么objectFactory对象是什么,在什么时候被实例化的?
还记得在最开始通过getExtensionLoader(IHelloService.class)获得一个ExtensionLoader对象的时候,由于ExtensionLoader类的构造方法是私有化的,所以在getExtensionLoader方法中
创建了ExtensionLoader对象,然而就在这个私有化的构造方法中,有这样一句代码

objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());

这就是objectFactory对象实例化的地方。
这里有创建了一个ExtensionFactory类的ExtensionLoader对象,并且将这个对象缓存在了EXTENSION_LOADERS(所以,整个运行过程中ExtensionFactory只有一个,并且都是AdaptiveExtensionFactory),然后调用了该对象的getAdaptiveExtension方法,该方法前面已经分析过来,
返回一个自适应扩展点。然后我们看ExtensionFactory的实现类,有一个名为AdaptiveExtensionFactory的自适应扩展点(因为该类上面标注了@Adaptive注解),所以我们可以发现objectFactory
对象的实例其实就是AdaptiveExtensionFactory类的实例对象。回到injectExtension方法的objectFactory.getExtension(pt, property)代码上,这里实际上调用的就是AdaptiveExtensionFactory
里面的getExtension方法。该方法如下:

public <T> T getExtension(Class<T> type, String name) {
        for (ExtensionFactory factory : factories) {
            T extension = factory.getExtension(type, name);
            if (extension != null) {
                return extension;
            }
        }
        return null;
    }

其中factories就是所有的ExtensionFactory类的扩展,从所有的扩展点中任意返回一个null的实例返回,dubbo默认有SpringExtensionFactorySPIExtensionFactory两个,SpringExtensionFactory的实现
就是从Spring的IOC容器中拿到对象注入。如果被注入对象类上标注了@SPI注解,那么最终还是交给SPIExtensionFactory对象去处理,该类里面有是通过ExtensionLoader得到一个自适应扩展点。到此,Dubbo的依赖注入完成。
类图:
ExtensionLoader

服务发布流程

Dubbo是阿里巴巴依赖Spring开源的RPC框架,至于为什么要依赖Spring我们不去深究,大概是因为Spring优秀的IOC,又或者是AOP,介于Spring的高度抽象,灵活的设计模式,便于去扩展,所以,Dubbo基于Spring的扩展区实现
Dubbo基于Spring扩展的NameSpaceHandler,Spring容器在启动的时候会调用DubboNamespaceHandlerinit()方法,该方法主要是解析Spring配置文件中的Dubbo扩展标签,将其转换成BeanDefinition,以便Spring容器进行管理。
Dubbo服务的发布流程是从ServiceBean开始的,因为该类实现了接口InitializingBean,该接口会在依赖注入完成过后调用afterPropertiesSet方法,而afterPropertiesSet方法就是Dubbo启动的关键。
首先在afterPropertiesSet方法中经过一些校验,在最后几行代码中,判断,是否支持SpringListener,如果不支持这调用export方法,如果支持,则会在Spring启动过程中执行ServiceBeanonApplicationEvent方法,总之都会调用到export方法,
export方法中主要是调用ServiceBean父类ServiceConfigexport方法,在该方法中,首先也是一堆的校验,最后调用doExport方法,继续往下看,doExportUrls()方法中首先是将所有的注册中心配置拼装成一个URL集合,类似如下:
registry://localhost:2181/org.apache.dubbo.registry.RegistryService?application=easyjava-dubbo-provider&dubbo=2.0.2&pid=205792&registry=zookeeper&release=2.7.0&timestamp=1574749134905,然后用循环的方式调用doExportUrlsFor1Protocol
该方法主要作用是将服务拼装成一个URL,如下:

dubbo://10.98.217.74:20880/xyz.easyjava.dubbo.api.IHelloService?anyhost=true&application=easyjava-dubbo-provider&bean.name=xyz.easyjava.dubbo.api.IHelloService&bind.ip=10.98.217.74&bind.port=20880&dubbo=2.0.2&generic=false&interface=xyz.easyjava.dubbo.api.IHelloService&methods=sayHello&pid=205792&release=2.7.0&side=provider&timestamp=1574749545179

最后将调用一下这句代码

Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);

proxyFactory是一个自适应扩展点,是ServiceConfig的成员变量
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
ProxyFactory默认扩展点是JavassistProxyFactory,并且该扩展点有一个包装器StubProxyFactoryWrapper,所以,proxyFactory实际上是StubProxyFactoryWrapper(JavassistProxyFactory())
调用StubProxyFactoryWrapper(JavassistProxyFactory())getInvoker方法,实际上最终会调用到JavassistProxyFactorygetInvoker方法,传入三个参数,第一个ref是当前服务接口的实现类,
例如:HelloServiceImpl,第二个参数(Class) interfaceClass是当前服务接口的类对象,第三个参数是注册中心加上服务地址拼接成的一个注册中心地址,服务地址作为注册中心的export参数,如下:

registry://localhost:2181/org.apache.dubbo.registry.RegistryService?application=easyjava-dubbo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F10.98.217.74%3A20880%2Fxyz.easyjava.dubbo.api.IHelloService%3Fanyhost%3Dtrue%26application%3Deasyjava-dubbo-provider%26bean.name%3Dxyz.easyjava.dubbo.api.IHelloService%26bind.ip%3D10.98.217.74%26bind.port%3D20880%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dxyz.easyjava.dubbo.api.IHelloService%26methods%3DsayHello%26pid%3D205792%26release%3D2.7.0%26side%3Dprovider%26timestamp%3D1574749545179&pid=205792&registry=zookeeper&release=2.7.0&timestamp=1574749134905

在该方法中,主要做了两件事情:

  1. 创建一个当前实例对象的Wrapper(代理对象),这里为什么需要有这样一层包装,我猜想的话应该是Dubbo的调用是通过URL进行的,我们可以方便的通过传入参数来决定调用哪个方法,我们通过Arthas来看一下Wrapper对象代码:
package org.apache.dubbo.common.bytecode;

import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import org.apache.dubbo.common.bytecode.ClassGenerator;
import org.apache.dubbo.common.bytecode.NoSuchMethodException;
import org.apache.dubbo.common.bytecode.NoSuchPropertyException;
import org.apache.dubbo.common.bytecode.Wrapper;
import xyz.easyjava.dubbo.provider.service.HelloServiceImpl;

public class Wrapper1
extends Wrapper
implements ClassGenerator.DC {
    public static String[] pns;
    public static Map pts;
    public static String[] mns;
    public static String[] dmns;
    public static Class[] mts0;

    @Override
    public String[] getPropertyNames() {
        return pns;
    }

    @Override
    public boolean hasProperty(String string) {
        return pts.containsKey(string);
    }

    public Class getPropertyType(String string) {
        return (Class)pts.get(string);
    }

    @Override
    public String[] getMethodNames() {
        return mns;
    }

    @Override
    public String[] getDeclaredMethodNames() {
        return dmns;
    }

    @Override
    public void setPropertyValue(Object object, String string, Object object2) {
        try {
            HelloServiceImpl helloServiceImpl = (HelloServiceImpl)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" field or setter method in class xyz.easyjava.dubbo.provider.service.HelloServiceImpl.").toString());
    }

    @Override
    public Object getPropertyValue(Object object, String string) {
        try {
            HelloServiceImpl helloServiceImpl = (HelloServiceImpl)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" field or setter method in class xyz.easyjava.dubbo.provider.service.HelloServiceImpl.").toString());
    }

    public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
        HelloServiceImpl helloServiceImpl;
        try {
            helloServiceImpl = (HelloServiceImpl)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        try {
            if ("sayHello".equals(string) && arrclass.length == 1) {
                return helloServiceImpl.sayHello((String)arrobject[0]);
            }
        }
        catch (Throwable throwable) {
            throw new InvocationTargetException(throwable);
        }
        throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class xyz.easyjava.dubbo.provider.service.HelloServiceImpl.").toString());
    }
}
  1. 创建一个匿名AbstractProxyInvoker,且doInvoke方法实际上是调用的Wrapper代理对象的invokeMethod方法
    最后,该方法会返回一个AbstractProxyInvoker,其中doInvoke(T proxy, String methodName,Class<?>[] parameterTypes,Object[] arguments)中会调用代理Wrapper类中wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);方法,
    得到invoker过后,再次用DelegateProviderMetaDataInvoker包装一下,通过protocol.export(wrapperInvoker);传入DelegateProviderMetaDataInvoker实例对象,得到一个exporter,那么这里的protocol又是什么实现呢,
    Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();这里又是一个自适应扩展点,会生成一个Protocol$Adaptive,我们前面已经分析过了,Protocol$Adaptive
    通过当前协议动态获取一个扩展点,那么当前URL的协议是registry,所以,这里应该会调用到RegistryProtocolexport方法,在该方法中,会调用getRegistryUrl()方法,这个方法将注册中心协议从registry改为
    URL中registry参数值作为协议头,如果不存在则默认使用dubbo注册中心。
    拿到注册中心和服务发布URL过后,该方法核心代码是final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);,这句代码就是暴露服务的关键,服务暴露过后,该方法中还有一句核心代码
    register(registryUrl, registeredProviderUrl);,这句代码就是将服务地址注册到注册中心,我们一个一个的来分析,dubbo究竟是如何发布服务并且将服务URL注册到注册中心的。
    首先是服务暴露,通过查看doLocalExport方法,该方法需要两个参数,当前invokerproviderUrl服务地址,该方法源码:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
    String key = getCacheKey(originInvoker);
    ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
    if (exporter == null) {
        synchronized (bounds) {
            exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                final Invoker<?> invokerDelegete = new InvokerDelegate<T>(originInvoker, providerUrl);
                exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                bounds.put(key, exporter);
            }
        }
    }
    return exporter;
}

我们可以看到protocol.export(invokerDelegete),这里的protocol是什么取决于invokerDelegete中URL协议是什么,这里显然URL是服务地址,所以协议应该是dubbo,所以这里的protocol最终得到的就是
DubboProtocol,查看DubboProtocol中的export方法,该方法中会调用openServer(url);,传入服务暴露地址,在openServer(url);方法中首先从缓存中获取一个server,如果缓存中没有,则创建一个,
那么我们看server是如何创建的,参看createServer方法,

ExchangeServer server;
try {
    server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
    throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}

通过Exchangers.bind(url, requestHandler);得到一个server,该方法最终会调用到HeaderExchanger中的bind方法,到这里还没完,在HeaderExchangerbind方法中创建一个HeaderExchangeServer对象,
该对象需要一个Server参数,这个ServerTransporters.bind中得来,这里又是一个自适应扩展点,但最终会调到NettyTransporter中的bind方法,最终在这里new了一个NettyServer,发布服务。
接下来分析服务注册,在RegistryProtocolexport方法中,有这样一句代码final Registry registry = getRegistry(originInvoker);,这句代码的作用就是活的一个注册中心,我们来分析一下getRegistry方法,

private Registry getRegistry(final Invoker<?> originInvoker) {
    URL registryUrl = getRegistryUrl(originInvoker);
    return registryFactory.getRegistry(registryUrl);
}

从代码中可以看出,首先通过URL得到注册中心的协议地址,这个时候这里应该是zookeeper://...,然后通过registryFactory得到一个注册中心工厂对象,但是这里的registryFactory又是什么,改成员变量有一个setter方法,
可见,这里的registryFactory是依赖注入进来的,又是一个RegistryFactory$Adaptive,通过协议地址动态活的一个RegistryFactory,当前协议为zookeeper,所以这里的registryFactory就是ZookeeperRegistryFactory
然后调用ZookeeperRegistryFactorygetRegistry方法,发现该类中并没有这个方法,所以会调用父类AbstractRegistryFactorygetRegistry方法,这是一个模板方法,具体实现由子类完成,在该方法中registry = createRegistry(url);就是
由子类ZookeeperRegistryFactory实现的,实现如下:

@Override
public Registry createRegistry(URL url) {
    return new ZookeeperRegistry(url, zookeeperTransporter);
}

回到RegistryProtocolexport方法中,得到一个ZookeeperRegistry注册中心过后,调用``方法,该方法实现如下:

public void register(URL registryUrl, URL registeredProviderUrl) {
    Registry registry = registryFactory.getRegistry(registryUrl);
    registry.register(registeredProviderUrl);
}

得到一个注册中心,并且注册,这里得到的注册中心和getRegistry方法得到注册中心方法一样,得到的都是ZookeeperRegistry,然后看ZookeeperRegistryregister方法,同理,ZookeeperRegistry中没有register则调用
父类FailbackRegistryregister方法,并且传入服务暴露URL,改方法又是一个模板方法,最终会调用doRegister,而这个方法在子类ZookeeperRegistry中实现,该方法如下:

@Override
public void doRegister(URL url) {
    try {
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

这里直接创建一个zookeeper节点,即服务注册,生成的path路径为:

/dubbo/xyz.easyjava.dubbo.api.IHelloService/providers/dubbo%3A%2F%2F10.98.217.74%3A20880%2Fxyz.easyjava.dubbo.api.IHelloService%3Fanyhost%3Dtrue%26application%3Deasyjava-dubbo-provider%26bean.name%3Dxyz.easyjava.dubbo.api.IHelloService%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dxyz.easyjava.dubbo.api.IHelloService%26methods%3DsayHello%26pid%3D212032%26release%3D2.7.0%26side%3Dprovider%26timestamp%3D1574752829404

以上就是服务注册已经服务暴露全过程。

服务引用初始化过程

Dubbo服务引用的时机有两个,第一个是Spring容器在调用ReferenceBeanafterPropertiesSet方法时,第二个是ReferenceBean对应的服务在被注入到其他对象中时,两者的区别在于第一种是饿汉式的,第二种是懒汉式的,
Dubbo默认是懒汉式的,我们可以通过配置<dubbo:reference init='true'>来将其改为饿汉式。
不管服务引用是饿汉还是懒汉模式,Dubbo都会调用ReferenceBeangetObject方法,接下来我们就从getObject方法开始分析,在get方法中,如果ReferenceBean对应的服务引用对象ref已存在,则直接返回,
如果不存在,则先调用init方法,服务引用对象ref就是在这里面进行创建的。init方法中除了对配置解析拼接到map中以外,最重要的是ref = createProxy(map);方法,在createProxy方法中,
首先是判断是否是本地调用,如果是,则创建InjvmProtocolrefer方法创建InjvmInvoker,则读取直连配置或注册中心URL(这里需要注意的是,如果选择了服务直连的方式,注册中心将失效,该方式用于在开发阶段调试过后一定要记得将其关掉),
如果urls只要一个,则直接通过refProtocol.refer()调用,如果urls有多个,则循环调用refprotocol.refer并且将invoker放到invokersList中,最后调用cluster.join(new StaticDirectory(invokers))将多个invoker伪装成一个invoker,
并且传入一个StaticDirectory静态目录服务,因为这里要么是多个注册中心,要么是多个服务提供者,而这些invoker是不会动态变化的。这个时候的cluster是一个自适应扩展点Cluster$Adaptive,循环调用时需要注意,如果有多个注册中心,
则在URL中添加cluster参数,值为registryaware,同样调用cluster.join(new StaticDirectory(u,invokers)),不过这里在创建StaticDirectory静态目录服务的时候多传入了一个注册中心的URL。这里先不去深究。
接下来分析invoker的创建过程,invoker的创建是通过protocol.refer方法,protocol的实现有很多,常用的是RegistryProtocol(注册中心)和DubboProtocol(服务直连使用Dubbo协议),这里只分析这两种协议。

DubboProtocol

class DubboProtocol {
    //...
    @Override
    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);
        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }
    
    private ExchangeClient[] getClients(URL url) {
        // whether to share connection
        boolean service_share_connect = false;
        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
        // if not configured, connection is shared, otherwise, one connection for one service
        if (connections == 0) {
            service_share_connect = true;
            connections = 1;
        }
        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (service_share_connect) {
                clients[i] = getSharedClient(url);
            } else {
                clients[i] = initClient(url);
            }
        }
        return clients;
    }
    //...
}

DubboProtocol的refer方法很简单,直接创建一个DubboInvoker返回,但是这里值得注意的是getClients(url)方法,该方法中,首先会从url中读取参数connections,该参数在

<dubbo:reference id="helloService" interface="xyz.easyjava.dubbo.api.IHelloService" url="dubbo://10.98.217.74:20880;dubbo://10.98.217.73:20880" connections="0"/>

中被指定,如果不指定,则默认为0,该值有三个,分别是:

  • 0:表示该服务使用JVM共享长连接(缺省)
  • 1:表示该服务使用独立一条长连接
  • 2:表示该服务使用独立两条长连接,这种方式一般使用于负载较大的服务。
    所以,这里默认使用JVM共享长连接的方法,这个时候代码会执行getSharedClient(url),该方法中首先从缓存中取,如果缓存未命中,则调用initClient(url)创建一个新的ExchangeClient,最后通过ReferenceCountExchangeClient包装过后放入到缓存最后返回,
    再来看一下initClient方法,initClient方法首先判断客户端类型,默认为Netty,并且设置默认心跳间隔时间,最后判断是否延迟链接,如果是延迟连接,则创建LazyConnectExchangeClient,否则调用Exchangers.connect(url, requestHandler),
    延迟LazyConnectExchangeClient在发送请求之前调用Exchangers.connect(url, requestHandler)方法,所以,这里我们直接分析,实际上这里最终调用的是HeaderExchanger.connect(URL url, ExchangeHandler handler)
class HeaderExchanger {
    //...
    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }
    //...
}

经过深入分析,Transporters.connect最终调用的是NettyTransporter.connect,方法中直接创建一个NettyClient,最终在NettyClient中连接目标服务,并且保存着客户端和服务间的channel,建立长连接。

RegistryProtocol

RegistryProtocolrefer方法中,首先设置URL协议头,从registry改为Zookeeper,然后通过注册中心工厂得到一个注册中心对象,这里得到的是ZookeeperRegistry,最后通过group判断调用doRefer方法的cluster参数应该是哪一个。
这里并不影响主流程,我们接着看doRefer方法,方法代码如下:

class RegistryProtocol {
    //...
    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
            registry.register(getRegisteredConsumerUrl(subscribeUrl, url));
        }
        directory.buildRouterChain(subscribeUrl);
        directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
                PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
        Invoker invoker = cluster.join(directory);
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }
    //...
}

以上代码首先创建注册中心目录服务,并且设置器注册中心对象ZookeeperRegistry和协议Protocol$Adaptive,然后组装消费者URLsubscribeUrl,注册到注册中心,构建路由器链,订阅providers,routers,configurators.
最后执行Invoker invoker = cluster.join(directory);并返回invoker,首先来分析,服务订阅过程

directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
                PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

服务订阅过程从directory.subscribe开始,在RegistryDirectory中调用registry.subscribe(url, this);,这里的registry是在doRefer方法中注入的,实例为ZookeeperRegistry,在ZookeeperRegistrysubscribe方法中
需要两个参数,一个URL和一个Listener,而这里传了this,说明这里this实现了NotifyListener接口,所以,这里传入的是this,接下来看ZookeeperRegistrysubscribe方法,发现该类中并没有此方法,那么必然会调父类的subscribe方法,
这里又是一个模板方法,最终还是会调用子类的实现doSubscribe方法,该方法中主要注册Zookeeper监听,如果有如果有节点变动,则会通知到ZookeeperRegistrynotify方法,传入listener,该方法调用链为:

ZookeeperRegistry.notify -> ZookeeperRegistry.doNotify -> AbstractRegistry.notify -> listener.notify(categoryList)(RegistryDirectory.notify)

categoryList:该参数为providers,routers,configurators节点下的所有URL地址。该方法中首先将注册中心读取到的URL转换成对象,比如Router,Configurator最后调用refreshInvoker,下面看一下refreshInvoker方法的实现:

class RegistryDirectory {
    private void refreshInvoker(List<URL> invokerUrls) {
        Assert.notNull(invokerUrls, "invokerUrls should not be null");

        if (invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls
                .get(0)
                .getProtocol())) {
            this.forbidden = true; // Forbid to access
            this.invokers = Collections.emptyList();
            routerChain.setInvokers(this.invokers);
            destroyAllInvokers(); // Close all invokers
        } else {
            this.forbidden = false; // Allow to access
            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
            if (invokerUrls == Collections.<URL>emptyList()) {
                invokerUrls = new ArrayList<>();
            }
            if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
                invokerUrls.addAll(this.cachedInvokerUrls);
            } else {
                this.cachedInvokerUrls = new HashSet<>();
                this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
            }
            if (invokerUrls.isEmpty()) {
                return;
            }
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map

            // state change
            // If the calculation is wrong, it is not processed.
            if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
                logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
                        .toString()));
                return;
            }

            List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
            // pre-route and build cache, notice that route cache should build on original Invoker list.
            // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
            routerChain.setInvokers(newInvokers);
            this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
            this.urlInvokerMap = newUrlInvokerMap;

            try {
                destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
            } catch (Exception e) {
                logger.warn("destroyUnusedInvokers error. ", e);
            }
        }
    }
}

首先判断协议地址是否只有一个,并且协议为empty协议,如果是,销毁所有invokers,否则将URL列表转成Invoker(这里根据协议会创建DubboInvoker)列表得到newUrlInvokerMap,最后将赋值给urlInvokerMap,达到刷新urlInvokerMap的目的,
并且关闭关闭未使用的Invoker,回到RegistryProtocoldoRefer方法的Invoker invoker = cluster.join(directory);,这行代码将服务目录传入,其目的是做客户端负载均衡和服务容错,总之提供集群服务治理支持,这块后面单独分析。

创建代理

不管是DubboProtocol还是RegistryProtocol最后都会返回一个Invoker,这个Invoker就是调用服务的客户端,里面封装了和服务端的长连接。但是如果我们直接将Invoker对象拿到业务代码中调用,这样对于我们的业务来说侵入性太高,
所以Dubbo使用代理的方式实现业务的零侵入,回到ReferenceConfigcreateProxy方法,在最后一行代码return (T) proxyFactory.getProxy(invoker);返回一个代理对象,这里的proxyFactory是一个JavassistProxyFactory,
这里首先会调用父类AbstractProxyFactorygetProxy方法,然后调用JavassistProxyFactorygetProxy方法,代理类就是在该方法中被生成的,接下来看下生成的代理类:

package org.apache.dubbo.common.bytecode;

import com.alibaba.dubbo.rpc.service.EchoService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import org.apache.dubbo.common.bytecode.ClassGenerator;
import xyz.easyjava.dubbo.api.IHelloService;

public class proxy0
implements ClassGenerator.DC,
EchoService,
IHelloService {
    public static Method[] methods;
    private InvocationHandler handler;

    public String sayHello(String string) {
        Object[] arrobject = new Object[]{string};
        Object object = this.handler.invoke(this, methods[0], arrobject);
        return (String)object;
    }

    @Override
    public Object $echo(Object object) {
        Object[] arrobject = new Object[]{object};
        Object object2 = this.handler.invoke(this, methods[1], arrobject);
        return object2;
    }

    public proxy0() {
    }

    public proxy0(InvocationHandler invocationHandler) {
        this.handler = invocationHandler;
    }
}

该代理类在创建实例的时候传入了一个InvokerInvocationHandler(invoker),所以,服务调用时其实最终会调用到InvokerInvocationHandlerinvoke方法:

public class InvokerInvocationHandler implements InvocationHandler {
    private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);
    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }

        return invoker.invoke(createInvocation(method, args)).recreate();
    }

    private RpcInvocation createInvocation(Method method, Object[] args) {
        RpcInvocation invocation = new RpcInvocation(method, args);
        if (RpcUtils.hasFutureReturnType(method)) {
            invocation.setAttachment(Constants.FUTURE_RETURNTYPE_KEY, "true");
            invocation.setAttachment(Constants.ASYNC_KEY, "true");
        }
        return invocation;
    }

}

服务目录

服务目录在前面将RegistryProtocol引入的时候已经讲过了,其主要作用是列出所有invoker,下面来看一下依赖关系:
DirectoryDependency
由上图可见,AbstractDirectory里面实现了list服务列举方法,该方法肯定是一个模板方法,具体实现由子类提供,接下里分析它的两个实现StaticDirectoryRegistryDirectory,顾名思义,StaticDirectory是静态目录服务,
invokers是固定不变的,该目录适用于注册中心,比如有三个注册中心,那么这三个注册中心在运行过程中是不会动态改变的。RegistryDirectory是动态的,具体的invokers会根据服务注册中心的变动而变动。
StaticDirectory实现很简单,这里不做过多分析,重点分析RegistryDirectoryRegistryDirectory实现了NotifyListener接口,改接口只有一个方法notify,该方法会在ZookeeperRegistrydoSubscribe(即服务订阅)时被注册,
zookeeper注册中心的服务节点变动时异步通知调用,得到更新过后的URL过后,会调用refreshInvoker方法刷新Invoker列表,改方法在服务引入时分析过了,主要就是将URL列表转换成Invoker列表,放到一个MAP中来达到更新invoker的目的。
总结:服务目录可以看成是一个List<Invoker>

服务路由

服务路由的作用是根据用户配置的路由规则来筛选服务提供者。比如有这样一条规则:

host = 10.20.153.10 => host = 10.20.153.11

该条规则表示 IP 为 10.20.153.10 的服务消费者只可调用 IP 为 10.20.153.11 机器上的服务,不可调用其他机器上的服务

服务集群

集群的工作可以分为两个阶段,第一:服务消费者初始化时创建ClusterInvoker对象,其目的是将Directory包装,伪装成一个invoker返回,第二:服务调用时,这个时候主要是调用AbstractClusterInvokerinvoke方法,该方法又是一个模板方法,
最终会调用具体实现类的doInvoke方法,在doInvoke方法中,封装了一些集群容错的机制,就拿缺省的FailoverClusterInvoker来分析,该方式在调用时出现错误,如果是业务异常,则直接抛出,如果不是业务异常,记录异常,然后重试,
重试次数可以在retries属性中指定,默认两次,不算第一次。

负载均衡

负载均衡是在服务调用过程中被确定的,具体在invoker.invoke方法中被初始化,这里的负载均衡器为LoadBalance$Adaptive,具体使用某个负载均衡器取决于用户配置,默认使用随机算法,这里不深入分析负载均衡算法。

服务调用过程

服务调用过程分为两个部分

  • 消费端发送请求
  • 服务端接受请求处理

消费端发送请求

在分析服务调用之前,先来看一下服务请求发送流程图
send-request-process
在前面服务引入创建代理时讲到,客户端是通过代理对象调用发送网络请求的,而代理对象是调用InvokerInvocationHandlerinvoke方法,所以,服务调用过程理应从这里开始。
该方法中,首先会封装请求参数Invocation对象,然后调用invokerinvoke方法,这里前面分析得出这里的invoker应该是MockClusterInvoker,这里不分析Dubbo的Mock机制,直接调用FailoverClusterInvokerinvoker方法,
改方法在父类AbstractClusterInvoker中,在该方法中,主要是初始化了负载均衡器RandomLoadBalance,然后调用FailoverClusterInvokerdoInvoke方法,在该方法中,首先通过负载均衡器拿到一个invoker,这里的invoker是我们在目录服务中RegistryDirectory回调
notify通知中创建的,这里创建的是DubboInvoker,所以这里调用AbstractInvoker中的invoke方法,然后该方法会调用DubboInvokerdoInvoke方法,当然,这中间会调用一些Filter这里不展开分析。
doInvoke方法中,会拿到ReferenceCountExchangeClient,然后调用request方法,这里的调用链比较长,如下:

ReferenceCountExchangeClient.request -> HeaderExchangeClient.request -> HeaderExchangeChannel.request -> HeaderExchangeClient.send -> HeaderExchangeChannel.send -> NettyChannel.send -> NioSocketChannel.writeAndFlush

由上面调用链可以看出,最终会通过Netty的channel调用writeAndFlush发送数据,最后将结果返回。当然这里面的数据编解码序列化在这里不展开。

服务端接受请求处理

服务端接受请求的入口在NettyServerHandler类中的channelRead方法,先来看一下调用链

NettyServerHandler.channelRead -> AbstractPeer.received -> MultiMessageHandler.received -> HeartbeatHandler.received -> AllChannelHandler.received(该方法中会创建一个线程去执行) -> ChannelEventRunnable.run
-> DecodeHandler.received -> HeaderExchangeHandler.received -> HeaderExchangeHandler.handleRequest -> DubboProtocol$1.reply(这里的DubboProtocol$1是ExchangeHandlerAdapter的子类,定义在DubboProtocol中的匿名内部类,该方法中通过channel和invocation获取invoker对象)
-> DelegateProviderMetaDataInvoker.invoke -> Wrapper1.invokeMethod -> HelloServiceImpl.sayHello

代码分析到这里,整个服务请求接受处理差不多就调用完了。