常见的使用dubbo的方式就是通过spring配置文件进行配置。例如下面这样
<?xml version="1.0" encoding="utf-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xmlns:dubbo="http://dubbo.apache.org/schema/dubbo" xsi:schemalocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd"> <dubbo:application name="helloservice-provider"/> <dubbo:registry address="zookeeper://localhost:2181"/> <dubbo:reference interface="com.zhuge.learn.dubbo.services.helloservice" check="false" id="helloservice"> <dubbo:method name="sayhello" retries="2"/> </dubbo:reference> </beans>
读过spring源码的应该知道,spring对于非默认命名空间的标签的解析是通过namespacehandlerresolver实现的,namespacehandlerresolver也算是一种spi机制,通过解析jar包中的meta-inf/spring.handlers文件,将所有的namespacehandler实现类以k-v的形式解析出来并放到内存中。所以要想扩展spring的命名空间,就要实现一个namespacehandler。
dubbo实现了自己的命名空间,对应的namespacehandler实现类是com.alibaba.dubbo.config.spring.schema.dubbonamespacehandler。这个类也很简单,就是定义了用于解析不同标签的beandefinition解析类。但是dubbo的实现稍有不同,它将所有标签的解析都放到同一个类同一个方法中,个人认为这种设计欠妥,不利于扩展新的标签。
使用过dubbo的都知道,如果我们要创建一个服务提供者,我们需要在配置文件中配置service标签,所以dubbo的服务导出一定与这个标签相关。查看dubbonamespacehandler代码,会发现,服务导出的逻辑主要是由servicebean实现的,所以接下来我们就以servicebean为入口,一步步来分析dubbo的服务导出过程。
servicebean继承了serviceconfig类,同时实现了一大堆接口,这些接口基本上都与spring框架相关。其中applicationlistener
真正的导出服务的逻辑在父类方法中 我们直接进入核心代码, private void doexporturls() {
从这段代码,我们可以看出来,dubbo会对配置的每个协议类型,每个注册中心全部进行服务导出和注册,服务导出和注册的次数=协议类型数*注册中心数 这段代码主要封装了参数解析,和url拼装的逻辑。 所以默认实现类是javassistproxyfactory 真正负责代码生成的是wrapper.makewrapper方法。这段代码比较长,逻辑比较复杂,而且代码生成的逻辑又很繁琐,其实也没有什么高深的技术,所以我决定直接用单元测试来生成一段代码,这样就能直观第理解生成的代码长什么样 首先给出一个被代理的接口 下面就是wrapper.makewrapper方法最后生成的代码的样子, 其中方法中的参数用$1,$2这种形式表示,猜测在javassist中会进行处理。 这个方法带有adaptive注解,是一个自适应方法,自适应扩展类,我们之前分析过,通过入参获取url,通过url获取指定key的值,用这个获取到的值作为扩展名加载扩展类,然后调用这个扩展类的方法。 所以接下来我们就分析一下registryprotocol.export的导出过程 我们略过一些不太重要的,这个方法主要就做了两件事: 就像注释中说的,通过自适应机制,根据运行时传入的invoker中的url动态决定使用哪个protocol,以常用的dubbo协议为例,对应的实现类是org.apache.dubbo.rpc.protocol.dubbo.dubboprotocol 主要的逻辑在openserver中 这个方法的主要作用就是缓存的双重检查锁,创建的服务的代码在createserver中 终于到正题了 默认的exchanger是headerexchanger, 服务启动逻辑在transporters.bind中 gettransporter方法返回的是自适应扩展类,会根据url决定使用哪个扩展类。 server参数的默认值是netty,所以我们分析一下nettytransporter 分析nettyserver构造器 主要逻辑:设置参数,然后打开服务。doopen方法由子类实现。 这里主要涉及到netty api的使用。设置boss线程池和worker线程池,然后通过启动类serverbootstrap绑定指定的host和port上,开始监听端口。 我只想说,这代码也太特么的深了。最初看到dubbo的架构图,就是那张十层的图时,我不太理解 ,看完这些代码我才明白,dubbo为什么能把层分得那么细,那么清晰。 另外,差点忘了一个重要内容,那就是netty的事件处理器,其实通过前面的层层调用我们可以发现,处理器类最开始在dubboprotocol被创建,沿着调用链一直传递到netty api。 这里面一个重要的方法就是reply方法,这个方法的主要内容就是检查参数类型,检查方法存不存在,然后调用原始的invoker。 接下来,我们回过头再来分析注册的逻辑,也就是向注册中心发送服务提供者信息。这部分的入口在 服务注册部分的逻辑不是很复杂,主要还是通过url中的protocol参数值通过自适应机制找到对应的registryfactory类,然后获取对应的registry类。onapplicationevent
public void onapplicationevent(contextrefreshedevent event) {
// 如果已经导出或者关闭服务,就忽略该事件
if (!isexported() && !isunexported()) {
if (logger.isinfoenabled()) {
logger.info("the service ready on spring started. service: " + getinterface());
}
export();
}
}
serviceconfig.export
// 这是一个同步方法,保证多线程情况下不会同时进行服务导出
public synchronized void export() {
// 检查一些配置是否为空,对于空的配置创建默认的配置
checkandupdatesubconfigs();
if (!shouldexport()) {
return;
}
if (shoulddelay()) {
// 延迟导出服务
delayexportexecutor.schedule(this::doexport, delay, timeunit.milliseconds);
} else {
doexport();
}
}
protected synchronized void doexport() {
// 首先做一些状态检查
// 如果已经反导出服务,说明服务已经被关闭
if (unexported) {
throw new illegalstateexception("the service " + interfaceclass.getname() + " has already unexported!");
}
// 如果已经导出过了,就不需要重复导出了
if (exported) {
return;
}
exported = true;
// 如果服务名为空,以服务接口名作为服务名称
if (stringutils.isempty(path)) {
path = interfacename;
}
doexporturls();
}
doexporturls
// 加载所有的注册中心的url
list
// 如果配置了多个协议,那么每种协议都要导出,并且是对所有可用的注册url进行注册
for (protocolconfig protocolconfig : protocols) {
// 拼接服务名称,这里的path一般就是服务名
string pathkey = url.buildkey(getcontextpath(protocolconfig).map(p -> p + "/" + path).orelse(path), group, version);
// 服务提供者模型,用于全面描述服务提供者的信息
providermodel providermodel = new providermodel(pathkey, ref, interfaceclass);
applicationmodel.initprovidermodel(pathkey, providermodel);
// 导出这个服务提供者,
// 向所有的可用的注册中心进行注册
doexporturlsfor1protocol(protocolconfig, registryurls);
}
}doexporturlsfor1protocol
创建代理类由proxyfactory实现,
创建本地服务并注册到注册中心有registryprotocol实现// 导出服务的核心代码
private void doexporturlsfor1protocol(protocolconfig protocolconfig, list<url> registryurls) {
// 协议名称
string name = protocolconfig.getname();
// 如果没有配置协议名称,默认是dubbo
if (stringutils.isempty(name)) {
name = constants.dubbo;
}
map<string, string> map = new hashmap<string, string>();
// 设置side属性为provider,side表示服务提供者还是消费者
map.put(constants.side_key, constants.provider_side);
// 添加运行时信息,包括
// 1. dubbo协议的版本号,2.0.10 ~ 2.6.2
// 2. dubbo版本号
// 3. 时间戳 信息
// 4. jvm进程号
appendruntimeparameters(map);
// 添加applicationconfig, 配置属性
appendparameters(map, application);
// 添加moduleconfig配置属性,模块配置,覆盖全局配置
appendparameters(map, module);
// 添加providerconfig配置属性
appendparameters(map, provider, constants.default_key);
// 添加协议配置,覆盖前面的配置
appendparameters(map, protocolconfig);
// 添加当前服务的配置,service标签的配置,覆盖前面的配置
// 容易看出来,配置的优先级:service > protocol > provider > module > application
appendparameters(map, this);
if (collectionutils.isnotempty(methods)) {
// 添加方法配置
for (methodconfig method : methods) {
appendparameters(map, method, method.getname());
// 替换retry配置
string retrykey = method.getname() + ".retry";
if (map.containskey(retrykey)) {
string retryvalue = map.remove(retrykey);
if ("false".equals(retryvalue)) {
map.put(method.getname() + ".retries", "0");
}
}
// 添加方法参数配置
list<argumentconfig> arguments = method.getarguments();
if (collectionutils.isnotempty(arguments)) {
for (argumentconfig argument : arguments) {
// convert argument type
// 添加方法参数配置
if (argument.gettype() != null && argument.gettype().length() > 0) {
method[] methods = interfaceclass.getmethods();
// visit all methods
if (methods != null && methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
string methodname = methods[i].getname();
// target the method, and get its signature
if (methodname.equals(method.getname())) {
class<?>[] argtypes = methods[i].getparametertypes();
// one callback in the method
// 只有一个回调
// 添加方法参数配置
if (argument.getindex() != -1) {
if (argtypes[argument.getindex()].getname().equals(argument.gettype())) {
appendparameters(map, argument, method.getname() + "." + argument.getindex());
} else {
throw new illegalargumentexception("argument config error : the index attribute and type attribute not match :index :" + argument.getindex() + ", type:" + argument.gettype());
}
} else {
// multiple callbacks in the method
for (int j = 0; j < argtypes.length; j++) {
class<?> argclazz = argtypes[j];
if (argclazz.getname().equals(argument.gettype())) {
appendparameters(map, argument, method.getname() + "." + j);
if (argument.getindex() != -1 && argument.getindex() != j) {
throw new illegalargumentexception("argument config error : the index attribute and type attribute not match :index :" + argument.getindex() + ", type:" + argument.gettype());
}
}
}
}
}
}
}
} else if (argument.getindex() != -1) {
appendparameters(map, argument, method.getname() + "." + argument.getindex());
} else {
throw new illegalargumentexception("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
// 是否是泛化服务
if (protocolutils.isgeneric(generic)) {
map.put(constants.generic_key, generic);
map.put(constants.methods_key, constants.any_value);
} else {
// 添加版本信息
string revision = version.getversion(interfaceclass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
// 设置方法名
string[] methods = wrapper.getwrapper(interfaceclass).getmethodnames();
if (methods.length == 0) {
logger.warn("no method found in service interface " + interfaceclass.getname());
map.put(constants.methods_key, constants.any_value);
} else {
map.put(constants.methods_key, stringutils.join(new hashset<string>(arrays.aslist(methods)), ","));
}
}
// 添加token信息
if (!configutils.isempty(token)) {
if (configutils.isdefault(token)) {
map.put(constants.token_key, uuid.randomuuid().tostring());
} else {
map.put(constants.token_key, token);
}
}
// export service
// 导出服务
// 添加bind.ip属性,并返回用于注册的ip
string host = this.findconfigedhosts(protocolconfig, registryurls, map);
// 添加bind.port属性,并返回用于注册的port
integer port = this.findconfigedports(protocolconfig, name, map);
// 根据前面获取的参数信息创建一个url
url url = new url(name, host, port, getcontextpath(protocolconfig).map(p -> p + "/" + path).orelse(path), map);
// 对url进行额外的配置
if (extensionloader.getextensionloader(configuratorfactory.class)
.hasextension(url.getprotocol())) {
url = extensionloader.getextensionloader(configuratorfactory.class)
.getextension(url.getprotocol()).getconfigurator(url).configure(url);
}
// 获取服务作用于,导出到本地还是远程
string scope = url.getparameter(constants.scope_key);
// don't export when none is configured
// scope属性值是none的不进行导出,直接忽略
if (!constants.scope_none.equalsignorecase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
// 只要scope属性不等于remote就会进行本地导出
if (!constants.scope_remote.equalsignorecase(scope)) {
exportlocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
// 只要scope属性不等于local就会进行远程导出
if (!constants.scope_local.equalsignorecase(scope)) {
if (logger.isinfoenabled()) {
logger.info("export dubbo service " + interfaceclass.getname() + " to url " + url);
}
if (collectionutils.isnotempty(registryurls)) {
// 对每一个注册中心都进行导出
for (url registryurl : registryurls) {
// 添加dynamic属性的参数
url = url.addparameterifabsent(constants.dynamic_key, registryurl.getparameter(constants.dynamic_key));
// 加载监控中心的url,监控中心也是一个服务提供者
url monitorurl = loadmonitor(registryurl);
if (monitorurl != null) {
// 添加参数到url中
url = url.addparameterandencoded(constants.monitor_key, monitorurl.tofullstring());
}
if (logger.isinfoenabled()) {
logger.info("register dubbo service " + interfaceclass.getname() + " url " + url + " to registry " + registryurl);
}
// for providers, this is used to enable custom proxy to generate invoker
// 获取用户配置的代理
string proxy = url.getparameter(constants.proxy_key);
if (stringutils.isnotempty(proxy)) {
registryurl = registryurl.addparameter(constants.proxy_key, proxy);
}
// ref属性是通过spring容器的ioc特性自动注入的,
// 在dubbobeandefinitionparser中对该属性进行了解析
invoker<?> invoker = proxyfactory.getinvoker(ref, (class) interfaceclass, registryurl.addparameterandencoded(constants.export_key, url.tofullstring()));
delegateprovidermetadatainvoker wrapperinvoker = new delegateprovidermetadatainvoker(invoker, this);
exporter<?> exporter = protocol.export(wrapperinvoker);
exporters.add(exporter);
}
} else {
invoker<?> invoker = proxyfactory.getinvoker(ref, (class) interfaceclass, url);
delegateprovidermetadatainvoker wrapperinvoker = new delegateprovidermetadatainvoker(invoker, this);
exporter<?> exporter = protocol.export(wrapperinvoker);
exporters.add(exporter);
}
/**
* @since 2.7.0
* servicedata store
*/
metadatareportservice metadatareportservice = null;
if ((metadatareportservice = getmetadatareportservice()) != null) {
metadatareportservice.publishprovider(url);
}
}
}
// 记录已经导出的
this.urls.add(url);
}
proxyfactory
@spi("javassist")
public interface proxyfactory {
/**
* create proxy.
*
* @param invoker
* @return proxy
*/
@adaptive({constants.proxy_key})
<t> t getproxy(invoker<t> invoker) throws rpcexception;
/**
* create proxy.
*
* @param invoker
* @return proxy
*/
@adaptive({constants.proxy_key})
<t> t getproxy(invoker<t> invoker, boolean generic) throws rpcexception;
/**
* create invoker.
*
* @param <t>
* @param proxy
* @param type
* @param url
* @return invoker
*/
// 这里规定了以proxy为key去url中查找扩展名,如果没有设置就用默认扩展名,
// 默认扩展名是由spi注解确定的,proxyfactory的默认扩展名就是javassist
// 查看meta-inf/dubbo/internal/org.apache.dubbo.rpc.proxyfactory文件,我们知道
// javassist对应的扩展类就是org.apache.dubbo.rpc.proxy.javassist.javassistproxyfactory
@adaptive({constants.proxy_key})
<t> invoker<t> getinvoker(t proxy, class<t> type, url url) throws rpcexception;
}
javassistproxyfactory.getinvoker
@override
public <t> invoker<t> getinvoker(t proxy, class<t> type, url url) {
// todo wrapper cannot handle this scenario correctly: the classname contains '$'
// wrapper不能正确处理类名中带有$的情况
// 获取一个包装类,用来根据传入的参数调用原始对象的不同方法
// 起到的作用就是方法路由。
// jdk动态代理使用反射调用不同的方法,效率较低。
// 而javaassist通过方法名以及参数个数和参数类型进行判断具体调用哪个方法,效率更高
final wrapper wrapper = wrapper.getwrapper(proxy.getclass().getname().indexof('$') < 0 ? proxy.getclass() : type);
// 生成一个invoker,内部仅仅是调用wrapper.invokemethod方法
return new abstractproxyinvoker<t>(proxy, type, url) {
@override
protected object doinvoke(t proxy, string methodname,
class<?>[] parametertypes,
object[] arguments) throws throwable {
return wrapper.invokemethod(proxy, methodname, parametertypes, arguments);
}
};
}
wrapper.makewrapper
public interface i2 {
void setname(string name);
void hello(string name);
int showint(int v);
float getfloat();
void setfloat(float f);
}
public class wrapper0 extends wrapper {
public static string[] pns;
public static java.util.map pts;
public static string[] mns;
public static string[] dmns;
public static class[] mts0;
public static class[] mts1;
public static class[] mts2;
public static class[] mts3;
public static class[] mts4;
public static class[] mts5;
public string[] getpropertynames() {
return pns;
}
public boolean hasproperty(string n) {
return pts.containskey($1);
}
public class getpropertytype(string n) {
return (class) pts.get($1);
}
public string[] getmethodnames() {
return mns;
}
public string[] getdeclaredmethodnames() {
return dmns;
}
public void setpropertyvalue(object o, string n, object v) {
org.apache.dubbo.common.bytecode.i2 w;
try {
w = ((org.apache.dubbo.common.bytecode.i2) $1);
} catch (throwable e) {
throw new illegalargumentexception(e);
}
if ($2.equals("name")) {
w.setname((java.lang.string) $3);
return;
}
if ($2.equals("float")) {
w.setfloat(((number) $3).floatvalue());
return;
}
throw new org.apache.dubbo.common.bytecode.nosuchpropertyexception("not found property \"" + $2 + "\" field or setter method in class org.apache.dubbo.common.bytecode.i2.");
}
public object getpropertyvalue(object o, string n) {
org.apache.dubbo.common.bytecode.i2 w;
try {
w = ((org.apache.dubbo.common.bytecode.i2) $1);
} catch (throwable e) {
throw new illegalargumentexception(e);
}
if ($2.equals("float")) {
return ($w) w.getfloat();
}
if ($2.equals("name")) {
return ($w) w.getname();
}
throw new org.apache.dubbo.common.bytecode.nosuchpropertyexception("not found property \"" + $2 + "\" field or setter method in class org.apache.dubbo.common.bytecode.i2.");
}
public object invokemethod(object o, string n, class[] p, object[] v) throws java.lang.reflect.invocationtargetexception {
org.apache.dubbo.common.bytecode.i2 w;
try {
w = ((org.apache.dubbo.common.bytecode.i2) $1);
} catch (throwable e) {
throw new illegalargumentexception(e);
}
try {
if ("getfloat".equals($2) && $3.length == 0) {
return ($w) w.getfloat();
}
if ("setname".equals($2) && $3.length == 1) {
w.setname((java.lang.string) $4[0]);
return null;
}
if ("setfloat".equals($2) && $3.length == 1) {
w.setfloat(((number) $4[0]).floatvalue());
return null;
}
if ("hello".equals($2) && $3.length == 1) {
w.hello((java.lang.string) $4[0]);
return null;
}
if ("showint".equals($2) && $3.length == 1) {
return ($w) w.showint(((number) $4[0]).intvalue());
}
if ("getname".equals($2) && $3.length == 0) {
return ($w) w.getname();
}
} catch (throwable e) {
throw new java.lang.reflect.invocationtargetexception(e);
}
throw new org.apache.dubbo.common.bytecode.nosuchmethodexception("not found method \"" + $2 + "\" in class org.apache.dubbo.common.bytecode.i2.");
}
}
我们主要看invokemethod,逻辑相对还是很明了的,通过方法名和参数个数判断应该调用哪个方法。
到这里,invoker对象就创建完成了,接下来就进入到服务导出的部分。protocol.export
@adaptive
<t> exporter<t> export(invoker<t> invoker) throws rpcexception;
但是export方法上的注解并没有给出key,回想一下生成自适应扩展类代码的细节,当adaptive注解未指定key时,将接口名转换为key,protocol会被转换为protocol,而对于key为protocol的情况会直接调用url.getprotocol方法获取协议类型作为扩展名。
在loadregistries方法中加载注册url时,已经将url的protocol属性设为registry,也就是说会使用org.apache.dubbo.registry.integration.registryprotocol来进行服务导出,接下来我们就来分析这个类。registryprotocol.export
public <t> exporter<t> export(final invoker<t> origininvoker) throws rpcexception {
// 获取注册的url,并将protocol替换为相应的协议类型
// 前面讲本来的协议类型设置到registry参数中,而将protocol参数设置为registry,
// 这样做是为了在自适应扩展机制在查找扩展名时能够根据扩展名是registry找到registryprotocol
// 找到之后并且进入这个类的方法之后,自然需要再把协议类型设置回来
url registryurl = getregistryurl(origininvoker);
// url to export locally
// 获取服务提供者url,用于导出到本地
// 注册的url中的一个参数,即export参数的值
url providerurl = getproviderurl(origininvoker);
// subscribe the override data
// fixme when the provider subscribes, it will affect the scene : a certain jvm exposes the service and call
// the same service. because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
// 获取订阅url,用于
final url overridesubscribeurl = getsubscribedoverrideurl(providerurl);
// 创建监听器
final overridelistener overridesubscribelistener = new overridelistener(overridesubscribeurl, origininvoker);
overridelisteners.put(overridesubscribeurl, overridesubscribelistener);
providerurl = overrideurlwithconfig(providerurl, overridesubscribelistener);
//export invoker
// 导出服务到本地
final exporterchangeablewrapper<t> exporter = dolocalexport(origininvoker, providerurl);
// url to registry
final registry registry = getregistry(origininvoker);
// 获取用于发送到注册中心的提供者url
final url registeredproviderurl = getregisteredproviderurl(providerurl, registryurl);
// 想服务提供者与消费者注册表中注册服务
providerinvokerwrapper<t> providerinvokerwrapper = providerconsumerregtable.registerprovider(origininvoker,
registryurl, registeredproviderurl);
//to judge if we need to delay publish
boolean register = registeredproviderurl.getparameter("register", true);
if (register) {
// 向注册中心注册
register(registryurl, registeredproviderurl);
providerinvokerwrapper.setreg(true);
}
// deprecated! subscribe to override rules in 2.6.x or before.
registry.subscribe(overridesubscribeurl, overridesubscribelistener);
exporter.setregisterurl(registeredproviderurl);
exporter.setsubscribeurl(overridesubscribeurl);
//ensure that a new exporter instance is returned every time export
return new destroyableexporter<>(exporter);
}
dolocalexport
private <t> exporterchangeablewrapper<t> dolocalexport(final invoker<t> origininvoker, url providerurl) {
// 用于缓存的key,即提供者url
string key = getcachekey(origininvoker);
// 如果服务在缓存中不存在,则需要进行导出
return (exporterchangeablewrapper<t>) bounds.computeifabsent(key, s -> {
invoker<?> invokerdelegete = new invokerdelegate<>(origininvoker, providerurl);
// protocol成员变量在加载扩展类的时候会进行注入,通过spi或spring容器查找到对应的
// 通过spi注入时会注入自适应扩展类,通过传入的url动态决定使用哪个protocol
return new exporterchangeablewrapper<>((exporter<t>) protocol.export(invokerdelegete), origininvoker);
});
}
dubboprotocol.export
public <t> exporter<t> export(invoker<t> invoker) throws rpcexception {
url url = invoker.geturl();
// export service.
// 服务的key组成:servicegroup/servicename:serviceversion:port
string key = servicekey(url);
// 创建一个dubboexporter。用于封装一些引用
dubboexporter<t> exporter = new dubboexporter<t>(invoker, key, exportermap);
exportermap.put(key, exporter);
//export an stub service for dispatching event
// 本地存根导出事件分发服务
boolean isstubsupportevent = url.getparameter(constants.stub_event_key, constants.default_stub_event);
// 是否是回调服务
boolean iscallbackservice = url.getparameter(constants.is_callback_service, false);
if (isstubsupportevent && !iscallbackservice) {
string stubservicemethods = url.getparameter(constants.stub_event_methods_key);
if (stubservicemethods == null || stubservicemethods.length() == 0) {
if (logger.iswarnenabled()) {
logger.warn(new illegalstateexception("consumer [" + url.getparameter(constants.interface_key) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubservicemethodsmap.put(url.getservicekey(), stubservicemethods);
}
}
// 开启服务
openserver(url);
optimizeserialization(url);
return exporter;
}
openserver
private void openserver(url url) {
// find server.
string key = url.getaddress();
//client can export a service which's only for server to invoke
// 客户端也能够导出服务,不过客户端导出的服务只是给服务端调用的
boolean isserver = url.getparameter(constants.is_server_key, true);
if (isserver) {
// 双重检查锁
exchangeserver server = servermap.get(key);
if (server == null) {
synchronized (this) {
server = servermap.get(key);
if (server == null) {
servermap.put(key, createserver(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
createserver
private exchangeserver createserver(url url) {
url = urlbuilder.from(url)
// send readonly event when server closes, it's enabled by default
// 服务端关闭时发送只读事件
.addparameterifabsent(constants.channel_readonlyevent_sent_key, boolean.true.tostring())
// enable heartbeat by default
// 设置心跳间隔
.addparameterifabsent(constants.heartbeat_key, string.valueof(constants.default_heartbeat))
// 设置编码器
.addparameter(constants.codec_key, dubbocodec.name)
.build();
// 传输协议,默认是netty
string str = url.getparameter(constants.server_key, constants.default_remoting_server);
if (str != null && str.length() > 0 && !extensionloader.getextensionloader(transporter.class).hasextension(str)) {
throw new rpcexception("unsupported server type: " + str + ", url: " + url);
}
exchangeserver server;
try {
// 绑定端口,开启服务
server = exchangers.bind(url, requesthandler);
} catch (remotingexception e) {
throw new rpcexception("fail to start server(url: " + url + ") " + e.getmessage(), e);
}
// 客户端传输协议
str = url.getparameter(constants.client_key);
if (str != null && str.length() > 0) {
set<string> supportedtypes = extensionloader.getextensionloader(transporter.class).getsupportedextensions();
if (!supportedtypes.contains(str)) {
throw new rpcexception("unsupported client type: " + str);
}
}
return server;
}
exchangers.bind
public static exchangeserver bind(url url, exchangehandler handler) throws remotingexception {
if (url == null) {
throw new illegalargumentexception("url == null");
}
if (handler == null) {
throw new illegalargumentexception("handler == null");
}
// 编解码器设为exchange
url = url.addparameterifabsent(constants.codec_key, "exchange");
return getexchanger(url).bind(url, handler);
}
headerexchanger.bind
@override
public exchangeserver bind(url url, exchangehandler handler) throws remotingexception {
return new headerexchangeserver(transporters.bind(url, new decodehandler(new headerexchangehandler(handler))));
}
transporters.bind
public static server bind(url url, channelhandler... handlers) throws remotingexception {
if (url == null) {
throw new illegalargumentexception("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new illegalargumentexception("handlers == null");
}
channelhandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new channelhandlerdispatcher(handlers);
}
return gettransporter().bind(url, handler);
}
transporter
@spi("netty")
public interface transporter {
/**
* bind a server.
*
* @param url server url
* @param handler
* @return server
* @throws remotingexception
* @see org.apache.dubbo.remoting.transporters#bind(url, channelhandler...)
*/
// 依次根据server和transporter参数值决定扩展名
@adaptive({constants.server_key, constants.transporter_key})
server bind(url url, channelhandler handler) throws remotingexception;
/**
* connect to a server.
*
* @param url server url
* @param handler
* @return client
* @throws remotingexception
* @see org.apache.dubbo.remoting.transporters#connect(url, channelhandler...)
*/
@adaptive({constants.client_key, constants.transporter_key})
client connect(url url, channelhandler handler) throws remotingexception;
}
nettytransporter.bind
@override
public server bind(url url, channelhandler listener) throws remotingexception {
return new nettyserver(url, listener);
}
nettyserver
public abstractserver(url url, channelhandler handler) throws remotingexception {
super(url, handler);
localaddress = geturl().toinetsocketaddress();
string bindip = geturl().getparameter(constants.bind_ip_key, geturl().gethost());
int bindport = geturl().getparameter(constants.bind_port_key, geturl().getport());
if (url.getparameter(constants.anyhost_key, false) || netutils.isinvalidlocalhost(bindip)) {
bindip = constants.anyhost_value;
}
bindaddress = new inetsocketaddress(bindip, bindport);
this.accepts = url.getparameter(constants.accepts_key, constants.default_accepts);
// 空闲线程超时时间,毫秒
this.idletimeout = url.getparameter(constants.idle_timeout_key, constants.default_idle_timeout);
try {
doopen();
if (logger.isinfoenabled()) {
logger.info("start " + getclass().getsimplename() + " bind " + getbindaddress() + ", export " + getlocaladdress());
}
} catch (throwable t) {
throw new remotingexception(url.toinetsocketaddress(), null, "failed to bind " + getclass().getsimplename()
+ " on " + getlocaladdress() + ", cause: " + t.getmessage(), t);
}
//fixme replace this with better method
datastore datastore = extensionloader.getextensionloader(datastore.class).getdefaultextension();
executor = (executorservice) datastore.get(constants.executor_service_component_key, integer.tostring(url.getport()));
}
nettyserver.doopen
protected void doopen() throws throwable {
nettyhelper.setnettyloggerfactory();
// boss线程池,这里使用了newcachedthreadpool,如果需要就会创建新的线程,
executorservice boss = executors.newcachedthreadpool(new namedthreadfactory("nettyserverboss", true));
// worker线程池
executorservice worker = executors.newcachedthreadpool(new namedthreadfactory("nettyserverworker", true));
// 最大32核
channelfactory channelfactory = new nioserversocketchannelfactory(boss, worker, geturl().getpositiveparameter(constants.io_threads_key, constants.default_io_threads));
// netty启动类
bootstrap = new serverbootstrap(channelfactory);
final nettyhandler nettyhandler = new nettyhandler(geturl(), this);
channels = nettyhandler.getchannels();
// https://issues.jboss.org/browse/netty-365
// https://issues.jboss.org/browse/netty-379
// final timer timer = new hashedwheeltimer(new namedthreadfactory("nettyidletimer", true));
bootstrap.setoption("child.tcpnodelay", true);
bootstrap.setpipelinefactory(new channelpipelinefactory() {
@override
public channelpipeline getpipeline() {
nettycodecadapter adapter = new nettycodecadapter(getcodec(), geturl(), nettyserver.this);
channelpipeline pipeline = channels.pipeline();
/*int idletimeout = getidletimeout();
if (idletimeout > 10000) {
pipeline.addlast("timer", new idlestatehandler(timer, idletimeout / 1000, 0, 0));
}*/
pipeline.addlast("decoder", adapter.getdecoder());
pipeline.addlast("encoder", adapter.getencoder());
pipeline.addlast("handler", nettyhandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getbindaddress());
}
服务启动到这里就告一段落,netty的部分就不再展开,不属于dubbo框架的内容。
代码中基本上能抽象成接口的都抽象出来,扩展性是大大增强了,但是要想弄明白框架的整体架构就得多花点时间消化消化了。dubboprotocol.requesthandler
实际上从socket接收到字节数组怎么被解析为invocation,这中间还有很长的调用链,通过代理模式进行层层封装,这块逻辑还不太懂,留着以后慢慢研究。服务注册
以zookeeper为例,其实就是在zookeeper上创建对应的路径。当然不仅仅是这么简单,其中还有重试,失败回退等逻辑,这里不再细说,目的就是知道大概的原理。
如对本文有疑问, 点击进行留言回复!!
网友评论